[RabbitMQ] Request-Response 구현(RPC 이용)

2023. 5. 25. 20:19프로젝트 로그/테스트x솔루션 JIG 개발기

반응형

메시지 구성요소

- deliveryMode  : 메시지의 지속성 (값 2) 또는 일시적 (기타 값)을 표시.

- contentType : 내용물의 mime-type

- replyTo : 일반적으로 callback queue의 이름을 지정하는데 사용

- correlationId : RPC 응답을 요청과 연관시키는데 사용 (요청을 구분할 수 있는 유일값)

 

Correlation Id

모든 RCP 요청에 대해 Callback Queue 생성 혹은 클라이언트 당 하나의 Callback Queue 생성

Callback Queue에서 메시지를 받으면 이 속성을 보고 응답과 요청을 일치 시킨다.

알 수 없는 correlationId를 보게 되면 안전하게 그 메시지를 버린다.

 

 

동작 설명

Client는 Server에게 Request를 송신하고 Reply를 기다린다. HTTP의 Request-Response 구조로 동작 한다고 생각하면 편하다.

Message Queue에서 Request-Response 구조로 구성하려 아래와 같이 동작 한다.

  1. Client에서는 두 가지 속성을 설정하고 Request 메시지를 보냄
    1. reply_to : 요청을 위해 생성된 익명의 Queue
    2. correlation_id : 요청을 구분할 수 있는 ID
  2. Request 메시지가 rpc_queue에 전달
  3. Server가 request 메시지를 수신하면, reply_to 필드의 큐에 결과를 포함한 메시지를 전송
  4. Client가 메시지를 수신하면, correlation_id 속성 값을 확인하고 요청 값과 일치하면 응답을 처리

 

Client 측 샘플 코드

    def onResponse(self, channel, method, properties, body):
        MqManagerLogger.instance().logger().debug("Receive Message correlation id : {0}".format(properties.correlation_id))
        MqManagerLogger.instance().logger().debug("Receive Message Body {0}".format(body))
        
        if self._correlationId == properties.correlation_id:
            self._response = body
            channel.basic_ack(delivery_tag = method.delivery_tag)
        else:
            self._response = None

    
    def requestMessage(self, topic, message_json, timeout_ms=1000):
        # if self.connection == None:
        #     Logger.instance().logger().error(" Stop Thread. Invalid connection object.")
        #     return False, self.response
        self._connection = self.connect()

        if self._connection == None:
            MqManagerLogger.instance().logger().error("There is no connection with MQ Broker")
            return False, self._response

        self._channel = self._connection.channel()

        result = self._channel.queue_declare(queue='', exclusive=True)
        self.callbackQueue = result.method.queue

        self._channel.basic_consume( queue=self.callbackQueue, on_message_callback=self.onResponse, auto_ack=False)

        self._response = None
        self._correlationId = str(uuid.uuid4())

        MqManagerLogger.instance().logger().debug("Queue Name : {0}, Correlation ID: {1}".format(self.callbackQueue, self._correlationId))

        self._channel.basic_publish(
            exchange='amq.topic',
            routing_key = topic,
            properties = pika.BasicProperties(
                content_type='application/json',
                reply_to = self.callbackQueue,
                correlation_id = self._correlationId,
            ),
            body = message_json
        )

        startTimeStamp = round(time.time() * 1000)
        while self._response is None:
            currentTimeStamp = round(time.time() * 1000)
            if (currentTimeStamp - startTimeStamp) > timeout_ms:
                MqManagerLogger.instance().logger().error("Timeout to wait response.{0} : {1}ms".format(self.callbackQueue, currentTimeStamp-startTimeStamp))
                self.close_connection()
                return False, self._response

            try:
                self._connection.process_data_events()
            except Exception as ex:
                MqManagerLogger.instance().logger().error("Error : {0}".format(ex))
                self.close_connection()
                return False, self._response
            
            time.sleep(0.01)
        
        self.close_connection()
        return True, self._response

 

Server 측 샘플 코드

    def onTestxMqRequestOnCallback(self, channel, method, properties, body): 
        MqManagerLogger.instance().logger().debug("Topic : {0}".format(method.routing_key))

        channel.basic_ack(delivery_tag = method.delivery_tag)

        if method.routing_key == '{0}.Req.JigStatus'.format(self.getJigManagerSerial()):
            MqManagerLogger.instance().logger().info("Recved Req.JigStatus")
            MqManagerLogger.instance().logger().info("JigStatus Body : {0}".format(body))
            responseData =  self.procedureJigStatus(body)
            
        #self.jigManager.testxMqManager.getMutex().acquire()
        try:
            if properties.correlation_id != None and properties.reply_to != None:
                channel.basic_publish(exchange='',
                            routing_key=properties.reply_to,
                            properties=pika.BasicProperties(correlation_id = properties.correlation_id),
                            body=responseData)
                
                #self.channel.basic_ack(delivery_tag = self.method.delivery_tag)
                self.setConnectionStatusWithBroker(status=True)
                MqManagerLogger.instance().logger().debug("Send response. Queue : {0}, Correlation Id : {1}".format(properties.reply_to, properties.correlation_id))
                MqManagerLogger.instance().logger().debug("Send response. Topic : {0}, Data : {1}".format(method.routing_key, responseData))

        except Exception as ex:
            MqManagerLogger.instance().logger().error(ex)
            MqManagerLogger.instance().logger().error("Fail to send response. Topic : {0}, Data : {1}".format(method.routing_key, responseData))
            self.setConnectionStatusWithBroker(status=False)

 

 

참고 자료

https://lovecode.tistory.com/88

https://blog.storyg.co/rabbitmqs/tutorials/python/06-RPC

 

반응형