[RabbitMQ] Request-Response 구현(RPC 이용)
2023. 5. 25. 20:19ㆍ프로젝트 로그/테스트x솔루션 JIG 개발기
반응형
메시지 구성요소
- deliveryMode : 메시지의 지속성 (값 2) 또는 일시적 (기타 값)을 표시.
- contentType : 내용물의 mime-type
- replyTo : 일반적으로 callback queue의 이름을 지정하는데 사용
- correlationId : RPC 응답을 요청과 연관시키는데 사용 (요청을 구분할 수 있는 유일값)
Correlation Id
모든 RCP 요청에 대해 Callback Queue 생성 혹은 클라이언트 당 하나의 Callback Queue 생성
Callback Queue에서 메시지를 받으면 이 속성을 보고 응답과 요청을 일치 시킨다.
알 수 없는 correlationId를 보게 되면 안전하게 그 메시지를 버린다.
동작 설명
Client는 Server에게 Request를 송신하고 Reply를 기다린다. HTTP의 Request-Response 구조로 동작 한다고 생각하면 편하다.
Message Queue에서 Request-Response 구조로 구성하려 아래와 같이 동작 한다.
- Client에서는 두 가지 속성을 설정하고 Request 메시지를 보냄
- reply_to : 요청을 위해 생성된 익명의 Queue
- correlation_id : 요청을 구분할 수 있는 ID
- Request 메시지가 rpc_queue에 전달
- Server가 request 메시지를 수신하면, reply_to 필드의 큐에 결과를 포함한 메시지를 전송
- Client가 메시지를 수신하면, correlation_id 속성 값을 확인하고 요청 값과 일치하면 응답을 처리
Client 측 샘플 코드
def onResponse(self, channel, method, properties, body):
MqManagerLogger.instance().logger().debug("Receive Message correlation id : {0}".format(properties.correlation_id))
MqManagerLogger.instance().logger().debug("Receive Message Body {0}".format(body))
if self._correlationId == properties.correlation_id:
self._response = body
channel.basic_ack(delivery_tag = method.delivery_tag)
else:
self._response = None
def requestMessage(self, topic, message_json, timeout_ms=1000):
# if self.connection == None:
# Logger.instance().logger().error(" Stop Thread. Invalid connection object.")
# return False, self.response
self._connection = self.connect()
if self._connection == None:
MqManagerLogger.instance().logger().error("There is no connection with MQ Broker")
return False, self._response
self._channel = self._connection.channel()
result = self._channel.queue_declare(queue='', exclusive=True)
self.callbackQueue = result.method.queue
self._channel.basic_consume( queue=self.callbackQueue, on_message_callback=self.onResponse, auto_ack=False)
self._response = None
self._correlationId = str(uuid.uuid4())
MqManagerLogger.instance().logger().debug("Queue Name : {0}, Correlation ID: {1}".format(self.callbackQueue, self._correlationId))
self._channel.basic_publish(
exchange='amq.topic',
routing_key = topic,
properties = pika.BasicProperties(
content_type='application/json',
reply_to = self.callbackQueue,
correlation_id = self._correlationId,
),
body = message_json
)
startTimeStamp = round(time.time() * 1000)
while self._response is None:
currentTimeStamp = round(time.time() * 1000)
if (currentTimeStamp - startTimeStamp) > timeout_ms:
MqManagerLogger.instance().logger().error("Timeout to wait response.{0} : {1}ms".format(self.callbackQueue, currentTimeStamp-startTimeStamp))
self.close_connection()
return False, self._response
try:
self._connection.process_data_events()
except Exception as ex:
MqManagerLogger.instance().logger().error("Error : {0}".format(ex))
self.close_connection()
return False, self._response
time.sleep(0.01)
self.close_connection()
return True, self._response
Server 측 샘플 코드
def onTestxMqRequestOnCallback(self, channel, method, properties, body):
MqManagerLogger.instance().logger().debug("Topic : {0}".format(method.routing_key))
channel.basic_ack(delivery_tag = method.delivery_tag)
if method.routing_key == '{0}.Req.JigStatus'.format(self.getJigManagerSerial()):
MqManagerLogger.instance().logger().info("Recved Req.JigStatus")
MqManagerLogger.instance().logger().info("JigStatus Body : {0}".format(body))
responseData = self.procedureJigStatus(body)
#self.jigManager.testxMqManager.getMutex().acquire()
try:
if properties.correlation_id != None and properties.reply_to != None:
channel.basic_publish(exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(correlation_id = properties.correlation_id),
body=responseData)
#self.channel.basic_ack(delivery_tag = self.method.delivery_tag)
self.setConnectionStatusWithBroker(status=True)
MqManagerLogger.instance().logger().debug("Send response. Queue : {0}, Correlation Id : {1}".format(properties.reply_to, properties.correlation_id))
MqManagerLogger.instance().logger().debug("Send response. Topic : {0}, Data : {1}".format(method.routing_key, responseData))
except Exception as ex:
MqManagerLogger.instance().logger().error(ex)
MqManagerLogger.instance().logger().error("Fail to send response. Topic : {0}, Data : {1}".format(method.routing_key, responseData))
self.setConnectionStatusWithBroker(status=False)
참고 자료
https://lovecode.tistory.com/88
https://blog.storyg.co/rabbitmqs/tutorials/python/06-RPC
반응형
'프로젝트 로그 > 테스트x솔루션 JIG 개발기' 카테고리의 다른 글
[RabbitMQ] Json Data Send/Receive (0) | 2023.06.02 |
---|---|
[라즈베리파이] PySide2 초기 환경 설정 (0) | 2023.06.01 |
[OrangePi5] 하드웨어 스펙 (0) | 2023.05.24 |
[OpenCV 디버깅] 카메라 QR 인식 시, 문제 해결 (0) | 2023.01.20 |
[Python] 구조체 형식으로 Serialize Deserialize 하기 (0) | 2023.01.18 |