#!/usr/bin/python # -*- coding: GBK -*- """ ÍøÂçЭÒ鶨ÒåÄ£¿é ¶¨ÒåÊý¾Ý°ü½á¹¹ºÍЭÒéÍ· """ import struct class ProtocolHead: """ЭÒéÍ·½á¹¹""" def __init__(self, data=None): self.cmd = 0 self.sub_cmd = 0 if data: self.from_bytes(data) def from_bytes(self, data): """´Ó×Ö½ÚÊý¾Ý½âÎö""" if len(data) >= 2: self.cmd = ord(data[0]) self.sub_cmd = ord(data[1]) def to_bytes(self): """ת»»Îª×Ö½ÚÊý¾Ý""" return struct.pack('BB', self.cmd, self.sub_cmd) def __str__(self): return 'Head: (Cmd:0x%02X, SubCmd:0x%02X)' % (self.cmd, self.sub_cmd) def __eq__(self, other): return self.cmd == other.cmd and self.sub_cmd == other.sub_cmd def __hash__(self): return hash((self.cmd, self.sub_cmd)) # ЭÒé³£Á¿ CMD_INTERFACE = 0x01 SUB_CMD_LOGIN = 0x01 # µÇ¼ SUB_CMD_LOGOUT = 0x02 # µÇ³ö SUB_CMD_EVENT_SEND = 0x03 # ʼþ·¢ËÍ SUB_CMD_HEARTBEAT = 0x04 # ÐÄÌø SUB_CMD_EVENT_STR = 0x05 # ×Ö·û´®Ê¼þ CMD_MONITOR = 0x0A SUB_CMD_OTHER_DAY_LOGIN = 0x01 # ·ÇͬÌìµÇ¼ SUB_CMD_PLAYER_LOGIN = 0x02 # Íæ¼ÒµÇ¼ SUB_CMD_LOGIN_VALID = 0x03 # ÓÐЧµÇ¼ SUB_CMD_FIRST_LOGIN = 0x04 # Ê״εǼ LOGIN_MAGIC_CODE = 0x34128621 class PacketParser: """Êý¾Ý°ü½âÎöÆ÷""" @staticmethod def read_byte(data, pos): """¶ÁÈ¡×Ö½Ú""" if pos >= len(data): raise Exception('read_byte: pos out of range') return ord(data[pos]), pos + 1 @staticmethod def read_uint32(data, pos): """¶ÁÈ¡32λÎÞ·ûºÅÕûÊý""" if pos + 4 > len(data): raise Exception('read_uint32: pos out of range') value = struct.unpack(' len(data): raise Exception('read_int64: pos out of range') value = struct.unpack(' len(data): raise Exception('read_string: pos out of range') return data[pos:pos+length], pos + length @staticmethod def read_var_string(data, pos): """¶ÁÈ¡±ä³¤×Ö·û´® (³¤¶Èǰ׺)""" length, pos = PacketParser.read_byte(data, pos) return PacketParser.read_string(data, pos, length) class LoginPacket: """µÇ¼Êý¾Ý°ü""" def __init__(self, data=None): self.head = ProtocolHead() self.group_id = 0 self.server_id = 0 self.magic_code = 0 if data: self.from_bytes(data) def from_bytes(self, data): pos = 0 self.head = ProtocolHead(data[pos:pos+2]) pos += 2 self.group_id, pos = PacketParser.read_uint32(data, pos) self.server_id, pos = PacketParser.read_uint32(data, pos) self.magic_code, pos = PacketParser.read_uint32(data, pos) class LogoutPacket: """µÇ³öÊý¾Ý°ü""" def __init__(self, data=None): self.head = ProtocolHead() self.reserve = 0 if data: self.from_bytes(data) def from_bytes(self, data): pos = 0 self.head = ProtocolHead(data[pos:pos+2]) pos += 2 self.reserve, pos = PacketParser.read_byte(data, pos) class EventSendPacket: """ʼþ·¢ËÍÊý¾Ý°ü""" def __init__(self, data=None): self.head = ProtocolHead() self.event_id = 0 self.data_len = 0 self.sz_data = '' self.time = 0 self.data = '' if data: self.from_bytes(data) def from_bytes(self, data): pos = 0 self.head = ProtocolHead(data[pos:pos+2]) pos += 2 self.event_id, pos = PacketParser.read_uint32(data, pos) self.data_len, pos = PacketParser.read_uint32(data, pos) self.sz_data, pos = PacketParser.read_string(data, pos, self.data_len) self.time, pos = PacketParser.read_int64(data, pos) self.data = data[pos:] class EventStrSendPacket: """×Ö·û´®Ê¼þ·¢ËÍÊý¾Ý°ü""" def __init__(self, data=None): self.head = ProtocolHead() self.event_id_string_len = 0 self.sz_event_id_string = '' self.data_len = 0 self.sz_data = '' self.ext_len = 0 self.ext = [] self.data = '' if data: self.from_bytes(data) def from_bytes(self, data): pos = 0 self.head = ProtocolHead(data[pos:pos+2]) pos += 2 self.event_id_string_len, pos = PacketParser.read_byte(data, pos) self.sz_event_id_string, pos = PacketParser.read_string(data, pos, self.event_id_string_len) self.data_len, pos = PacketParser.read_uint32(data, pos) self.sz_data, pos = PacketParser.read_string(data, pos, self.data_len) self.ext_len, pos = PacketParser.read_byte(data, pos) self.ext = [] for i in range(self.ext_len): ext_value, pos = PacketParser.read_uint32(data, pos) self.ext.append(ext_value) self.data = data[pos:] class HeartBeatPacket: """ÐÄÌøÊý¾Ý°ü""" def __init__(self, data=None): self.head = ProtocolHead() self.time = 0 if data: self.from_bytes(data) def from_bytes(self, data): pos = 0 self.head = ProtocolHead(data[pos:pos+2]) pos += 2 self.time, pos = PacketParser.read_uint32(data, pos)