[Python] 구조체 형식으로 Serialize Deserialize 하기
2023. 1. 18. 20:04ㆍ프로젝트 로그/테스트x솔루션 JIG 개발기
반응형
프로토콜 기반으로 데이터를 송/수신 하다 보면, 정해진 패킷을 만들거나 수신해서 분석(Parsing) 해야 하는 경우가 많습니다.
이럴 때 마다, Byte 단위로 접근 하여 패킷을 만들거나 분석하는 방식으로는 유지보수가 쉽지 않아 C언어의 구조체를 사용하듯이 데이터를 구조화 할 수 있도록 파이썬 코드를 변경 하였습니다.
python 라이브러리의 ctypes.Structre를 사용 하였으며, 아래와 같은 구조의 패킷을 만들거나 분석 하기 위한 클래스를 만들어서 사용하는 방법을 설명 합니다.
아래 코드는 위 그림과 같은 포맷의 데이터를 만들기 위한 클래스 입니다.
- serialize()
- 구조체를 byteArray 형태로 변경
- deserialize()
- byte 형태의 데이터를 구조체 형태로 변경
- __setattr__() / __getattr__()
- 구조체의 fields("STX", "VER".. 등)에 값을 직접 쓸 수 있게 하는 내장 함수
- 아래 코드에서는 setattr을 상황에 맞게 변경 할 필요가 있어 오버라이딩 함.
- __str__()
- 객체를 문자열로 변환하기 위한 내장 함수
- CONFIG_TOOL_DATA_FORMAT 클래스를 print 하면 해당 필드 값들을 출력하기 위해 오버라이딩 함.
class CONFIG_TOOL_DATA_FORMAT(ctypes.Structure):
CharPtr = ctypes.POINTER(ctypes.c_char)
_pack = 1
_fields_ = [
("STX", ctypes.c_uint16),
("VER", ctypes.c_uint16),
("CMD", ctypes.c_uint16),
("TYPE", ctypes.c_uint16),
("HW_TYPE", ctypes.c_uint16),
("HW_OPTION", ctypes.c_uint16),
("MSG_OPTION", ctypes.c_uint16),
("DATA_LEN", ctypes.c_uint16),
("DATA", CharPtr),
("CRC32", ctypes.c_uint32)
]
@classmethod
def getHeaderAndCRCLength(cls):
fmt_prefix = ">8H"
fmt_crc = "I"
return struct.calcsize(fmt_prefix) + struct.calcsize(fmt_crc)
@classmethod
def deserialize(cls, buf):
fmt_prefix = ">8H"
fmt_crc = "I"
data_len = len(buf) - struct.calcsize(fmt_prefix) - struct.calcsize(fmt_crc)
#Logger.instance().logger().debug("Data Length : {0}".format(data_len))
if data_len == 0:
fmt = fmt_prefix + fmt_crc
else:
fmt = fmt_prefix + "B" * data_len + fmt_crc
unpacked = struct.unpack(fmt, buf)
return cls(unpacked[0], unpacked[1], unpacked[2], unpacked[3], unpacked[4], unpacked[5], unpacked[6], unpacked[7], unpacked[8:-1], unpacked[-1] )
def serialize(self):
# Big Endian
if self.DATA_LEN == 0:
tempBuf = struct.pack(">8H", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN)
else:
tempBuf = struct.pack(">8H" + "B" * self.DATA_LEN, self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, *self.DATA[:self.DATA_LEN])
# CRC32_MODBUS 계산 후 추가
self.CRC32 = zlib.crc32(bytearray(tempBuf))
if self.DATA_LEN == 0:
buf = struct.pack(">8HI", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, self.CRC32)
else:
buf = struct.pack(">8H" + "B" * self.DATA_LEN + "I", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, *self.DATA[:self.DATA_LEN], self.CRC32)
return bytearray(buf)
def __setattr__(self, name, value):
if name == "DATA":
self.DATA_LEN = len(value)
buf = (ctypes.c_char * len(value))(*value)
super().__setattr__(name, buf)
else:
super().__setattr__(name, value)
def __str__(self):
s = self.__repr__()
for field_name, _ in self._fields_[:-1]:
s += "\n {0:s}: {1:}".format(field_name, getattr(self, field_name))
s += "\n {0:s}:".format(self._fields_[-2][0])
for i in range(self.DATA_LEN):
s += " 0x{0:02X}".format(ord(self.DATA[i]))
s += "\n {0:s}: 0x{1:04X}".format(self._fields_[-1][0], getattr(self, self._fields_[-1][0]))
return "{0:s}\n".format(s)
아래 코드는 위 클래스를 사용하는 예제 입니다.
testData = CONFIG_TOOL_DATA_FORMAT()
testData.STX = 0x55AA
testData.VER = 0x0001
testData.CMD = 0x0003
serialziedData = testData.serialize()
parseData = testData.deserialize(serializedData)
print(parseData)
print(parseData.STX)
반응형
'프로젝트 로그 > 테스트x솔루션 JIG 개발기' 카테고리의 다른 글
[OrangePi5] 하드웨어 스펙 (0) | 2023.05.24 |
---|---|
[OpenCV 디버깅] 카메라 QR 인식 시, 문제 해결 (0) | 2023.01.20 |
[라즈베리파이] SWAP (0) | 2023.01.17 |
[PyQt]QcoreApplication.processEvents() (1) | 2022.10.31 |
[AWS Lambda] Application 만들기 (0) | 2022.10.28 |