| New file |
| | |
| | | #!/usr/bin/python |
| | | # -*- coding: GBK -*- |
| | | |
| | | """ |
| | | EventServer主服务器模块 |
| | | 实现TCP服务器,接收游戏客户端事件并记录到本地文件 |
| | | """ |
| | | |
| | | import sys |
| | | import socket |
| | | import select |
| | | import threading |
| | | import datetime |
| | | from config import ConfigReader |
| | | from clients_manager import ClientsMgr |
| | | from file_manager import FileMgr |
| | | from packet_processor import PacketProcessor |
| | | from packet_logger import packet_logger |
| | | |
| | | |
| | | class Connection: |
| | | """连接封装类""" |
| | | |
| | | def __init__(self, sock, addr, cid, packet_processor): |
| | | self.sock = sock |
| | | self.addr = addr |
| | | self.cid = cid |
| | | self.packet_processor = packet_processor |
| | | self.running = True |
| | | self.recv_buffer = '' |
| | | self.lock = threading.Lock() |
| | | |
| | | # 创建接收线程 |
| | | self.recv_thread = threading.Thread(target=self._recv_loop) |
| | | self.recv_thread.daemon = True |
| | | self.recv_thread.start() |
| | | |
| | | def _recv_loop(self): |
| | | """接收数据循环""" |
| | | while self.running: |
| | | try: |
| | | # 检查socket是否有数据可读 |
| | | readable, _, _ = select.select([self.sock], [], [], 1.0) |
| | | |
| | | if readable: |
| | | # 接收缓冲区大小: 建议至少是最大包长度的2倍,避免频繁recv调用 |
| | | # 最大包 = 4(header) + 8192(payload) = 8196 字节 |
| | | # 使用32768可以容纳3-4个完整包,提高效率 |
| | | data = self.sock.recv(32768) |
| | | |
| | | if not data: |
| | | # 连接断开 |
| | | break |
| | | |
| | | # 累积数据到缓冲区 |
| | | with self.lock: |
| | | self.recv_buffer += data |
| | | |
| | | # 尝试解析完整的数据包 |
| | | while self._try_parse_packet(): |
| | | pass |
| | | |
| | | except socket.error as e: |
| | | print('[Connection] Socket error: %s' % str(e)) |
| | | break |
| | | except Exception as e: |
| | | print('[Connection] Error: %s' % str(e)) |
| | | break |
| | | |
| | | self.close() |
| | | |
| | | def _try_parse_packet(self): |
| | | """尝试解析数据包""" |
| | | import struct |
| | | |
| | | # PacketHeader: FF CC (2字节) + Length (2字节) = 4字节 |
| | | PACKET_HEADER_SIZE = 4 |
| | | PACKET_HEADER_MC = 0xCCFF |
| | | MAX_PACKET_LEN = 8192 |
| | | |
| | | if len(self.recv_buffer) < PACKET_HEADER_SIZE: |
| | | return False |
| | | |
| | | # 检查 MagicCode (FF CC) |
| | | magic_code = struct.unpack('<H', self.recv_buffer[0:2])[0] |
| | | if magic_code != PACKET_HEADER_MC: |
| | | # 整个缓冲区数据无效,清空等待新数据 |
| | | packet_logger.log_text('Invalid magic code: 0x%04X, clear buffer data: %s' % (magic_code, self.recv_buffer), 'WARN') |
| | | self.recv_buffer = '' |
| | | return False |
| | | |
| | | # 读取包长度(不包含PacketHeader本身) |
| | | packet_len = struct.unpack('<H', self.recv_buffer[2:4])[0] |
| | | |
| | | # 检查包长度是否合理 |
| | | if packet_len > MAX_PACKET_LEN: |
| | | # 整个缓冲区数据无效,清空等待新数据 |
| | | packet_logger.log_text('Packet length %d exceeds max %d, clear buffer. Buffer data: %s' % (packet_len, MAX_PACKET_LEN, self.recv_buffer), 'WARN') |
| | | self.recv_buffer = '' |
| | | return False |
| | | |
| | | # 计算总长度(包含PacketHeader) |
| | | total_len = PACKET_HEADER_SIZE + packet_len |
| | | |
| | | if len(self.recv_buffer) < total_len: |
| | | # 数据不完整,等待更多数据 |
| | | return False |
| | | |
| | | # 提取协议数据(跳过4字节PacketHeader,只传Cmd/SubCmd及之后的数据) |
| | | packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len] |
| | | self.recv_buffer = self.recv_buffer[total_len:] |
| | | |
| | | # 交给包处理器处理 |
| | | self.packet_processor.add_packet(self.cid, packet_data) |
| | | |
| | | return True |
| | | |
| | | def send(self, data): |
| | | """发送数据""" |
| | | try: |
| | | self.sock.sendall(data) |
| | | return True |
| | | except Exception as e: |
| | | print('[Connection] Send error: %s' % str(e)) |
| | | return False |
| | | |
| | | def close(self): |
| | | """关闭连接""" |
| | | if self.running: |
| | | self.running = False |
| | | |
| | | if self.sock: |
| | | try: |
| | | self.sock.close() |
| | | except: |
| | | pass |
| | | self.sock = None |
| | | |
| | | # 通知客户端管理器 |
| | | ClientsMgr.instance().on_client_disconnect(self.cid) |
| | | |
| | | |
| | | class EventServer: |
| | | """事件服务器""" |
| | | |
| | | def __init__(self): |
| | | self.config = ConfigReader.instance() |
| | | self.file_mgr = FileMgr(self.config) |
| | | self.packet_processor = PacketProcessor(self.file_mgr) |
| | | |
| | | self.server_sock = None |
| | | self.connections = {} # cid -> Connection |
| | | self.next_cid = 1 |
| | | self.lock = threading.Lock() |
| | | self.running = False |
| | | self.accept_thread = None |
| | | |
| | | def start(self): |
| | | """启动服务器""" |
| | | port = self.config.get_listen_port() |
| | | |
| | | # 创建服务器socket |
| | | self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| | | self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| | | |
| | | try: |
| | | self.server_sock.bind(('0.0.0.0', port)) |
| | | self.server_sock.listen(5) |
| | | |
| | | print('[EventServer] Server started on port %d' % port) |
| | | print('[EventServer] Log path: %s' % self.config.get_log_file_path()) |
| | | print('[EventServer] Write mode: %d' % self.config.get_write_mode()) |
| | | print('') |
| | | |
| | | # 启动包处理器 |
| | | self.packet_processor.start() |
| | | |
| | | # 启动接收线程 |
| | | self.running = True |
| | | self.accept_thread = threading.Thread(target=self._accept_loop) |
| | | self.accept_thread.daemon = True |
| | | self.accept_thread.start() |
| | | |
| | | return True |
| | | |
| | | except Exception as e: |
| | | print('[EventServer] Failed to start server: %s' % str(e)) |
| | | self.server_sock.close() |
| | | return False |
| | | |
| | | def _accept_loop(self): |
| | | """接受连接循环""" |
| | | while self.running: |
| | | try: |
| | | # 检查是否有新的连接 |
| | | readable, _, _ = select.select([self.server_sock], [], [], 1.0) |
| | | |
| | | if readable: |
| | | client_sock, addr = self.server_sock.accept() |
| | | |
| | | with self.lock: |
| | | cid = self.next_cid |
| | | self.next_cid += 1 |
| | | |
| | | connection = Connection(client_sock, addr, cid, self.packet_processor) |
| | | |
| | | with self.lock: |
| | | self.connections[cid] = connection |
| | | |
| | | print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1])) |
| | | |
| | | except socket.error as e: |
| | | if self.running: |
| | | print('[EventServer] Accept error: %s' % str(e)) |
| | | break |
| | | except Exception as e: |
| | | print('[EventServer] Error: %s' % str(e)) |
| | | break |
| | | |
| | | def get_connection_count(self): |
| | | """获取连接数""" |
| | | with self.lock: |
| | | return len(self.connections) |
| | | |
| | | def get_queue_size(self): |
| | | """获取包队列大小""" |
| | | return self.packet_processor.get_queue_size() |
| | | |
| | | def get_success_count(self): |
| | | """获取成功写入次数""" |
| | | return self.file_mgr.get_success_count() |
| | | |
| | | def get_fail_count(self): |
| | | """获取失败写入次数""" |
| | | return self.file_mgr.get_fail_count() |
| | | |
| | | def stop(self): |
| | | """停止服务器""" |
| | | if not self.running: |
| | | return |
| | | |
| | | print('[EventServer] Stopping server...') |
| | | self.running = False |
| | | |
| | | # 关闭所有连接 |
| | | with self.lock: |
| | | for conn in self.connections.values(): |
| | | conn.close() |
| | | self.connections.clear() |
| | | |
| | | # 停止包处理器 |
| | | self.packet_processor.stop() |
| | | |
| | | # 关闭服务器socket |
| | | if self.server_sock: |
| | | try: |
| | | self.server_sock.close() |
| | | except: |
| | | pass |
| | | |
| | | # 关闭所有文件 |
| | | self.file_mgr.close_all() |
| | | |
| | | print('[EventServer] Server stopped') |