[Python] 구조체 형식으로 Serialize Deserialize 하기

2023. 1. 18. 20:04프로젝트 로그/테스트x솔루션 JIG 개발기

반응형

프로토콜 기반으로 데이터를 송/수신 하다 보면, 정해진 패킷을 만들거나 수신해서 분석(Parsing) 해야 하는 경우가 많습니다.

이럴 때 마다, Byte 단위로 접근 하여 패킷을 만들거나 분석하는 방식으로는 유지보수가 쉽지 않아 C언어의 구조체를 사용하듯이 데이터를 구조화 할 수 있도록 파이썬 코드를 변경 하였습니다.

 

python 라이브러리의 ctypes.Structre를 사용 하였으며, 아래와 같은 구조의 패킷을 만들거나 분석 하기 위한 클래스를 만들어서 사용하는 방법을 설명 합니다.

아래 코드는 위 그림과 같은 포맷의 데이터를 만들기 위한 클래스 입니다.

  • serialize()
    • 구조체를 byteArray 형태로 변경
  • deserialize()
    • byte 형태의 데이터를 구조체 형태로 변경
  • __setattr__() / __getattr__()
    • 구조체의 fields("STX", "VER".. 등)에 값을 직접 쓸 수 있게 하는 내장 함수
    • 아래 코드에서는 setattr을 상황에 맞게 변경 할 필요가 있어 오버라이딩 함.
  • __str__()
    • 객체를 문자열로 변환하기 위한 내장 함수
    • CONFIG_TOOL_DATA_FORMAT 클래스를 print 하면 해당 필드 값들을 출력하기 위해 오버라이딩 함.
class CONFIG_TOOL_DATA_FORMAT(ctypes.Structure):
    CharPtr = ctypes.POINTER(ctypes.c_char)

    _pack = 1
    _fields_ = [
        ("STX",  ctypes.c_uint16),
        ("VER",  ctypes.c_uint16),
        ("CMD",  ctypes.c_uint16),
        ("TYPE", ctypes.c_uint16),
        ("HW_TYPE", ctypes.c_uint16),
        ("HW_OPTION", ctypes.c_uint16),
        ("MSG_OPTION", ctypes.c_uint16),
        ("DATA_LEN", ctypes.c_uint16),
        ("DATA", CharPtr),
        ("CRC32", ctypes.c_uint32)
    ]

    @classmethod
    def getHeaderAndCRCLength(cls):
        fmt_prefix = ">8H"
        fmt_crc = "I"
        
        return struct.calcsize(fmt_prefix) + struct.calcsize(fmt_crc)

    @classmethod
    def deserialize(cls, buf):
        fmt_prefix = ">8H"
        fmt_crc = "I"
        data_len = len(buf) - struct.calcsize(fmt_prefix) - struct.calcsize(fmt_crc)
        #Logger.instance().logger().debug("Data Length : {0}".format(data_len))

        if data_len == 0:
            fmt = fmt_prefix + fmt_crc
        else:
            fmt = fmt_prefix + "B" * data_len + fmt_crc
        
        unpacked = struct.unpack(fmt, buf)
        return cls(unpacked[0], unpacked[1], unpacked[2], unpacked[3], unpacked[4], unpacked[5], unpacked[6], unpacked[7], unpacked[8:-1], unpacked[-1] )
    
    def serialize(self):
        # Big Endian
        if self.DATA_LEN == 0:
            tempBuf = struct.pack(">8H", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN)
        else:
            tempBuf = struct.pack(">8H" + "B" * self.DATA_LEN, self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, *self.DATA[:self.DATA_LEN])

        # CRC32_MODBUS 계산 후 추가
        self.CRC32 = zlib.crc32(bytearray(tempBuf))
        
        if self.DATA_LEN == 0:
            buf = struct.pack(">8HI", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, self.CRC32)
        else:
            buf = struct.pack(">8H" + "B" * self.DATA_LEN + "I", self.STX, self.VER, self.CMD, self.TYPE, self.HW_TYPE, self.HW_OPTION, self.MSG_OPTION, self.DATA_LEN, *self.DATA[:self.DATA_LEN], self.CRC32)

        return bytearray(buf)
    
    def __setattr__(self, name, value):
        if name == "DATA":
            self.DATA_LEN = len(value)
            buf = (ctypes.c_char * len(value))(*value)
            super().__setattr__(name, buf)
        else:
            super().__setattr__(name, value)
    
    def __str__(self):
        s = self.__repr__()
        for field_name, _ in self._fields_[:-1]:
            s += "\n  {0:s}: {1:}".format(field_name, getattr(self, field_name))
        
        s += "\n  {0:s}:".format(self._fields_[-2][0])
        for i in range(self.DATA_LEN):
            s += " 0x{0:02X}".format(ord(self.DATA[i]))
        
        s += "\n  {0:s}: 0x{1:04X}".format(self._fields_[-1][0], getattr(self, self._fields_[-1][0]))
        return "{0:s}\n".format(s)

아래 코드는 위 클래스를 사용하는 예제 입니다.

testData = CONFIG_TOOL_DATA_FORMAT()
testData.STX = 0x55AA
testData.VER = 0x0001
testData.CMD = 0x0003

serialziedData = testData.serialize()

parseData = testData.deserialize(serializedData)
print(parseData)
print(parseData.STX)

 

반응형