#!/usr/bin/python
|
# -*- coding: GBK -*-
|
|
"""
|
Êý¾Ý°ü´¦ÀíÄ£¿é
|
¸ºÔðÊý¾Ý°üµÄ½âÎö¡¢·Ö·¢ºÍ´¦Àí
|
"""
|
|
import threading
|
import datetime
|
import time
|
from protocol import (
|
ProtocolHead, LoginPacket, LogoutPacket, EventSendPacket, EventStrSendPacket,
|
HeartBeatPacket, CMD_INTERFACE, SUB_CMD_LOGIN, SUB_CMD_LOGOUT,
|
SUB_CMD_EVENT_SEND, SUB_CMD_HEARTBEAT, SUB_CMD_EVENT_STR
|
)
|
from clients_manager import handle_login, handle_logout, handle_heartbeat
|
from packet_logger import packet_logger
|
|
|
class PacketQueue:
|
"""Êý¾Ý°ü¶ÓÁÐ"""
|
|
def __init__(self):
|
self.queue = []
|
self.condition = threading.Condition()
|
self.running = True
|
|
def put(self, cid, data):
|
"""·ÅÈëÊý¾Ý°ü"""
|
with self.condition:
|
self.queue.append((cid, data))
|
self.condition.notify()
|
|
def get(self, timeout=None):
|
"""»ñÈ¡Êý¾Ý°ü"""
|
with self.condition:
|
while len(self.queue) == 0 and self.running:
|
if timeout:
|
if not self.condition.wait(timeout):
|
return None, None
|
else:
|
self.condition.wait()
|
|
if not self.running:
|
return None, None
|
|
if len(self.queue) > 0:
|
return self.queue.pop(0)
|
|
return None, None
|
|
def stop(self):
|
"""Í£Ö¹¶ÓÁÐ"""
|
with self.condition:
|
self.running = False
|
self.condition.notify_all()
|
|
def size(self):
|
"""»ñÈ¡¶ÓÁдóС"""
|
with self.condition:
|
return len(self.queue)
|
|
|
class PacketProcessor:
|
"""Êý¾Ý°ü´¦ÀíÆ÷"""
|
|
def __init__(self, file_mgr):
|
self.file_mgr = file_mgr
|
self.packet_queue = PacketQueue()
|
self.process_thread = None
|
self.running = False
|
|
def add_packet(self, cid, data):
|
"""Ìí¼ÓÊý¾Ý°üµ½¶ÓÁÐ"""
|
self.packet_queue.put(cid, data)
|
|
def start(self):
|
"""Æô¶¯´¦ÀíÏß³Ì"""
|
if self.running:
|
return
|
|
self.running = True
|
self.process_thread = threading.Thread(target=self._process_loop)
|
self.process_thread.daemon = True
|
self.process_thread.start()
|
|
def stop(self):
|
"""Í£Ö¹´¦ÀíÏß³Ì"""
|
if not self.running:
|
return
|
|
self.running = False
|
self.packet_queue.stop()
|
|
if self.process_thread:
|
self.process_thread.join(timeout=5)
|
|
def _process_loop(self):
|
"""´¦ÀíÑ»·"""
|
while self.running:
|
cid, data = self.packet_queue.get(timeout=1)
|
|
if cid is None or data is None:
|
continue
|
|
try:
|
self._process_packet(cid, data)
|
except Exception as e:
|
print('[PacketProcessor] Process packet error: %s' % str(e))
|
|
def _process_packet(self, cid, data):
|
"""´¦Àíµ¥¸öÊý¾Ý°ü"""
|
if len(data) < 2:
|
return
|
|
# ½âÎöÐÒéÍ·
|
head = ProtocolHead(data[:2])
|
|
# ¼Ç¼Ôʼ·â°üÈÕÖ¾
|
packet_logger.log(cid, data, 'RAW', 'RECV')
|
|
# ¸ù¾ÝCmd/SubCmd·Ö·¢
|
if head.cmd == CMD_INTERFACE:
|
self._process_interface_packet(cid, head, data)
|
else:
|
print('[PacketProcessor] Unknown cmd: 0x%02X' % head.cmd)
|
|
def _process_interface_packet(self, cid, head, data):
|
"""´¦ÀíInterfaceÐÒéÊý¾Ý°ü"""
|
if head.sub_cmd == SUB_CMD_LOGIN:
|
packet = LoginPacket(data)
|
handle_login(cid, packet)
|
packet_logger.log(cid, data, 'LOGIN', 'RECV')
|
|
elif head.sub_cmd == SUB_CMD_LOGOUT:
|
packet = LogoutPacket(data)
|
handle_logout(cid, packet)
|
packet_logger.log(cid, data, 'LOGOUT', 'RECV')
|
|
elif head.sub_cmd == SUB_CMD_HEARTBEAT:
|
packet = HeartBeatPacket(data)
|
handle_heartbeat(cid, packet)
|
packet_logger.log(cid, data, 'HEARTBEAT', 'RECV')
|
|
elif head.sub_cmd == SUB_CMD_EVENT_SEND:
|
packet = EventSendPacket(data)
|
self._process_event_send(cid, packet)
|
packet_logger.log(cid, data, 'EVENT', 'RECV')
|
|
elif head.sub_cmd == SUB_CMD_EVENT_STR:
|
packet = EventStrSendPacket(data)
|
self._process_event_str_send(cid, packet)
|
packet_logger.log(cid, data, 'EVENT_STR', 'RECV')
|
|
else:
|
print('[PacketProcessor] Unknown interface subcmd: 0x%02X' % head.sub_cmd)
|
|
def _process_event_send(self, cid, packet):
|
"""´¦Àíʼþ·¢ËͰü"""
|
# дÈëÎļþ£¨sz_data ÒѾÊÇ×Ö·û´®£©
|
event_type = packet.event_id
|
self.file_mgr.write_event(cid, event_type, packet.sz_data)
|
|
def _process_event_str_send(self, cid, packet):
|
"""´¦Àí×Ö·û´®Ê¼þ·¢ËͰü"""
|
# дÈëÎļþ£¨sz_data ÒѾÊÇ×Ö·û´®£©
|
event_type = packet.sz_event_id_string
|
self.file_mgr.write_event(cid, event_type, packet.sz_data)
|
|
def get_queue_size(self):
|
"""»ñÈ¡¶ÓÁдóС"""
|
return self.packet_queue.size()
|