#!/usr/bin/python # -*- coding: GBK -*- """ Êý¾Ý°ü´¦ÀíÄ£¿é ¸ºÔðÊý¾Ý°üµÄ½âÎö¡¢·Ö·¢ºÍ´¦Àí """ import threading import datetime import time from protocol import ( ProtocolHead, LoginPacket, LogoutPacket, EventSendPacket, EventStrSendPacket, HeartBeatPacket, CMD_INTERFACE, SUB_CMD_LOGIN, SUB_CMD_LOGOUT, SUB_CMD_EVENT_SEND, SUB_CMD_HEARTBEAT, SUB_CMD_EVENT_STR ) from clients_manager import handle_login, handle_logout, handle_heartbeat from packet_logger import packet_logger class PacketQueue: """Êý¾Ý°ü¶ÓÁÐ""" def __init__(self): self.queue = [] self.condition = threading.Condition() self.running = True def put(self, cid, data): """·ÅÈëÊý¾Ý°ü""" with self.condition: self.queue.append((cid, data)) self.condition.notify() def get(self, timeout=None): """»ñÈ¡Êý¾Ý°ü""" with self.condition: while len(self.queue) == 0 and self.running: if timeout: if not self.condition.wait(timeout): return None, None else: self.condition.wait() if not self.running: return None, None if len(self.queue) > 0: return self.queue.pop(0) return None, None def stop(self): """Í£Ö¹¶ÓÁÐ""" with self.condition: self.running = False self.condition.notify_all() def size(self): """»ñÈ¡¶ÓÁдóС""" with self.condition: return len(self.queue) class PacketProcessor: """Êý¾Ý°ü´¦ÀíÆ÷""" def __init__(self, file_mgr): self.file_mgr = file_mgr self.packet_queue = PacketQueue() self.process_thread = None self.running = False def add_packet(self, cid, data): """Ìí¼ÓÊý¾Ý°üµ½¶ÓÁÐ""" self.packet_queue.put(cid, data) def start(self): """Æô¶¯´¦ÀíÏß³Ì""" if self.running: return self.running = True self.process_thread = threading.Thread(target=self._process_loop) self.process_thread.daemon = True self.process_thread.start() def stop(self): """Í£Ö¹´¦ÀíÏß³Ì""" if not self.running: return self.running = False self.packet_queue.stop() if self.process_thread: self.process_thread.join(timeout=5) def _process_loop(self): """´¦ÀíÑ­»·""" while self.running: cid, data = self.packet_queue.get(timeout=1) if cid is None or data is None: continue try: self._process_packet(cid, data) except Exception as e: print('[PacketProcessor] Process packet error: %s' % str(e)) def _process_packet(self, cid, data): """´¦Àíµ¥¸öÊý¾Ý°ü""" if len(data) < 2: return # ½âÎöЭÒéÍ· head = ProtocolHead(data[:2]) # ¼Ç¼ԭʼ·â°üÈÕÖ¾ packet_logger.log(cid, data, 'RAW', 'RECV') # ¸ù¾ÝCmd/SubCmd·Ö·¢ if head.cmd == CMD_INTERFACE: self._process_interface_packet(cid, head, data) else: print('[PacketProcessor] Unknown cmd: 0x%02X' % head.cmd) def _process_interface_packet(self, cid, head, data): """´¦ÀíInterfaceЭÒéÊý¾Ý°ü""" if head.sub_cmd == SUB_CMD_LOGIN: packet = LoginPacket(data) handle_login(cid, packet) packet_logger.log(cid, data, 'LOGIN', 'RECV') elif head.sub_cmd == SUB_CMD_LOGOUT: packet = LogoutPacket(data) handle_logout(cid, packet) packet_logger.log(cid, data, 'LOGOUT', 'RECV') elif head.sub_cmd == SUB_CMD_HEARTBEAT: packet = HeartBeatPacket(data) handle_heartbeat(cid, packet) packet_logger.log(cid, data, 'HEARTBEAT', 'RECV') elif head.sub_cmd == SUB_CMD_EVENT_SEND: packet = EventSendPacket(data) self._process_event_send(cid, packet) packet_logger.log(cid, data, 'EVENT', 'RECV') elif head.sub_cmd == SUB_CMD_EVENT_STR: packet = EventStrSendPacket(data) self._process_event_str_send(cid, packet) packet_logger.log(cid, data, 'EVENT_STR', 'RECV') else: print('[PacketProcessor] Unknown interface subcmd: 0x%02X' % head.sub_cmd) def _process_event_send(self, cid, packet): """´¦Àíʼþ·¢ËͰü""" # дÈëÎļþ£¨sz_data ÒѾ­ÊÇ×Ö·û´®£© event_type = packet.event_id self.file_mgr.write_event(cid, event_type, packet.sz_data) def _process_event_str_send(self, cid, packet): """´¦Àí×Ö·û´®Ê¼þ·¢ËͰü""" # дÈëÎļþ£¨sz_data ÒѾ­ÊÇ×Ö·û´®£© event_type = packet.sz_event_id_string self.file_mgr.write_event(cid, event_type, packet.sz_data) def get_queue_size(self): """»ñÈ¡¶ÓÁдóС""" return self.packet_queue.size()