#!/usr/bin/python # -*- coding: GBK -*- """ EventServerÖ÷·þÎñÆ÷Ä£¿é ʵÏÖTCP·þÎñÆ÷,½ÓÊÕÓÎÏ·¿Í»§¶Ëʼþ²¢¼Ç¼µ½±¾µØÎļþ """ import sys import socket import select import threading import datetime from config import ConfigReader from clients_manager import ClientsMgr from file_manager import FileMgr from packet_processor import PacketProcessor from packet_logger import packet_logger class Connection: """Á¬½Ó·â×°Àà""" def __init__(self, sock, addr, cid, packet_processor): self.sock = sock self.addr = addr self.cid = cid self.packet_processor = packet_processor self.running = True self.recv_buffer = '' self.lock = threading.Lock() self.consecutive_discards = 0 # Á¬Ðø¶ªÆú¼ÆÊýÆ÷ # ´´½¨½ÓÊÕÏß³Ì self.recv_thread = threading.Thread(target=self._recv_loop) self.recv_thread.daemon = True self.recv_thread.start() def _recv_loop(self): """½ÓÊÕÊý¾ÝÑ­»·""" while self.running: try: # ¼ì²ésocketÊÇ·ñÓÐÊý¾Ý¿É¶Á readable, _, _ = select.select([self.sock], [], [], 1.0) if readable: data = self.sock.recv(4096) if not data: # Á¬½Ó¶Ï¿ª break # ÀÛ»ýÊý¾Ýµ½»º³åÇø with self.lock: self.recv_buffer += data # ³¢ÊÔ½âÎöÍêÕûµÄÊý¾Ý°ü while self._try_parse_packet(): pass except socket.error as e: print('[Connection] Socket error: %s' % str(e)) break except Exception as e: print('[Connection] Error: %s' % str(e)) break # ¼ì²éÊÇ·ñÒò´óÁ¿ÎÞЧ°üÐèÒª¹Ø±Õ if self.consecutive_discards > 1000: print('[Connection] Too many invalid packets, closing connection %d' % self.cid) self.close() def _try_parse_packet(self): """³¢ÊÔ½âÎöÊý¾Ý°ü""" import struct # PacketHeader: FF CC (2×Ö½Ú) + Length (2×Ö½Ú) = 4×Ö½Ú PACKET_HEADER_SIZE = 4 PACKET_HEADER_MC = 0xCCFF MAX_PACKET_LEN = 2048 if len(self.recv_buffer) < PACKET_HEADER_SIZE: return False # ¼ì²é MagicCode (FF CC) magic_code = struct.unpack(' 1000: packet_logger.log_text('Too many invalid packets, closing connection', 'ERROR') return False packet_logger.log_text('Invalid magic code: 0x%04X, discard byte (count: %d)' % (magic_code, self.consecutive_discards), 'WARN') self.recv_buffer = self.recv_buffer[1:] return False # ¶ÁÈ¡°ü³¤¶È£¨²»°üº¬PacketHeader±¾Éí£© packet_len = struct.unpack(' MAX_PACKET_LEN: self.consecutive_discards += 1 packet_logger.log_text('Packet length %d exceeds max %d, discard byte (count: %d)' % (packet_len, MAX_PACKET_LEN, self.consecutive_discards), 'WARN') self.recv_buffer = self.recv_buffer[1:] return False # ¼ÆËã×ܳ¤¶È£¨°üº¬PacketHeader£© total_len = PACKET_HEADER_SIZE + packet_len if len(self.recv_buffer) < total_len: # Êý¾Ý²»ÍêÕû£¬µÈ´ý¸ü¶àÊý¾Ý return False # ÌáȡЭÒéÊý¾Ý£¨Ìø¹ý4×Ö½ÚPacketHeader£¬Ö»´«Cmd/SubCmd¼°Ö®ºóµÄÊý¾Ý£© packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len] self.recv_buffer = self.recv_buffer[total_len:] # ÖØÖöªÆú¼ÆÊýÆ÷ self.consecutive_discards = 0 # ½»¸ø°ü´¦ÀíÆ÷´¦Àí self.packet_processor.add_packet(self.cid, packet_data) return True def send(self, data): """·¢ËÍÊý¾Ý""" try: self.sock.sendall(data) return True except Exception as e: print('[Connection] Send error: %s' % str(e)) return False def close(self): """¹Ø±ÕÁ¬½Ó""" if self.running: self.running = False if self.sock: try: self.sock.close() except: pass self.sock = None # ֪ͨ¿Í»§¶Ë¹ÜÀíÆ÷ ClientsMgr.instance().on_client_disconnect(self.cid) class EventServer: """ʼþ·þÎñÆ÷""" def __init__(self): self.config = ConfigReader.instance() self.file_mgr = FileMgr(self.config) self.packet_processor = PacketProcessor(self.file_mgr) self.server_sock = None self.connections = {} # cid -> Connection self.next_cid = 1 self.lock = threading.Lock() self.running = False self.accept_thread = None def start(self): """Æô¶¯·þÎñÆ÷""" port = self.config.get_listen_port() # ´´½¨·þÎñÆ÷socket self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.server_sock.bind(('0.0.0.0', port)) self.server_sock.listen(5) print('[EventServer] Server started on port %d' % port) print('[EventServer] Log path: %s' % self.config.get_log_file_path()) print('[EventServer] Write mode: %d' % self.config.get_write_mode()) print('') # Æô¶¯°ü´¦ÀíÆ÷ self.packet_processor.start() # Æô¶¯½ÓÊÕÏß³Ì self.running = True self.accept_thread = threading.Thread(target=self._accept_loop) self.accept_thread.daemon = True self.accept_thread.start() return True except Exception as e: print('[EventServer] Failed to start server: %s' % str(e)) self.server_sock.close() return False def _accept_loop(self): """½ÓÊÜÁ¬½ÓÑ­»·""" while self.running: try: # ¼ì²éÊÇ·ñÓÐеÄÁ¬½Ó readable, _, _ = select.select([self.server_sock], [], [], 1.0) if readable: client_sock, addr = self.server_sock.accept() with self.lock: cid = self.next_cid self.next_cid += 1 connection = Connection(client_sock, addr, cid, self.packet_processor) with self.lock: self.connections[cid] = connection print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1])) except socket.error as e: if self.running: print('[EventServer] Accept error: %s' % str(e)) break except Exception as e: print('[EventServer] Error: %s' % str(e)) break def get_connection_count(self): """»ñÈ¡Á¬½ÓÊý""" with self.lock: return len(self.connections) def get_queue_size(self): """»ñÈ¡°ü¶ÓÁдóС""" return self.packet_processor.get_queue_size() def get_success_count(self): """»ñÈ¡³É¹¦Ð´Èë´ÎÊý""" return self.file_mgr.get_success_count() def get_fail_count(self): """»ñȡʧ°ÜдÈë´ÎÊý""" return self.file_mgr.get_fail_count() def stop(self): """Í£Ö¹·þÎñÆ÷""" if not self.running: return print('[EventServer] Stopping server...') self.running = False # ¹Ø±ÕËùÓÐÁ¬½Ó with self.lock: for conn in self.connections.values(): conn.close() self.connections.clear() # Í£Ö¹°ü´¦ÀíÆ÷ self.packet_processor.stop() # ¹Ø±Õ·þÎñÆ÷socket if self.server_sock: try: self.server_sock.close() except: pass # ¹Ø±ÕËùÓÐÎļþ self.file_mgr.close_all() print('[EventServer] Server stopped')