hxp
2026-02-04 ab57c59ab33f5e4bf7e3bfd5514b04cb3d38dc2b
ServerPython/EventServerPY/server.py
New file
@@ -0,0 +1,264 @@
#!/usr/bin/python
# -*- coding: GBK -*-
"""
EventServer主服务器模块
实现TCP服务器,接收游戏客户端事件并记录到本地文件
"""
import sys
import socket
import select
import threading
import datetime
from config import ConfigReader
from clients_manager import ClientsMgr
from file_manager import FileMgr
from packet_processor import PacketProcessor
from packet_logger import packet_logger
class Connection:
    """连接封装类"""
    def __init__(self, sock, addr, cid, packet_processor):
        self.sock = sock
        self.addr = addr
        self.cid = cid
        self.packet_processor = packet_processor
        self.running = True
        self.recv_buffer = ''
        self.lock = threading.Lock()
        # 创建接收线程
        self.recv_thread = threading.Thread(target=self._recv_loop)
        self.recv_thread.daemon = True
        self.recv_thread.start()
    def _recv_loop(self):
        """接收数据循环"""
        while self.running:
            try:
                # 检查socket是否有数据可读
                readable, _, _ = select.select([self.sock], [], [], 1.0)
                if readable:
                    # 接收缓冲区大小: 建议至少是最大包长度的2倍,避免频繁recv调用
                    # 最大包 = 4(header) + 8192(payload) = 8196 字节
                    # 使用32768可以容纳3-4个完整包,提高效率
                    data = self.sock.recv(32768)
                    if not data:
                        # 连接断开
                        break
                    # 累积数据到缓冲区
                    with self.lock:
                        self.recv_buffer += data
                        # 尝试解析完整的数据包
                        while self._try_parse_packet():
                            pass
            except socket.error as e:
                print('[Connection] Socket error: %s' % str(e))
                break
            except Exception as e:
                print('[Connection] Error: %s' % str(e))
                break
        self.close()
    def _try_parse_packet(self):
        """尝试解析数据包"""
        import struct
        # PacketHeader: FF CC (2字节) + Length (2字节) = 4字节
        PACKET_HEADER_SIZE = 4
        PACKET_HEADER_MC = 0xCCFF
        MAX_PACKET_LEN = 8192
        if len(self.recv_buffer) < PACKET_HEADER_SIZE:
            return False
        # 检查 MagicCode (FF CC)
        magic_code = struct.unpack('<H', self.recv_buffer[0:2])[0]
        if magic_code != PACKET_HEADER_MC:
            # 整个缓冲区数据无效,清空等待新数据
            packet_logger.log_text('Invalid magic code: 0x%04X, clear buffer data: %s' % (magic_code, self.recv_buffer), 'WARN')
            self.recv_buffer = ''
            return False
        # 读取包长度(不包含PacketHeader本身)
        packet_len = struct.unpack('<H', self.recv_buffer[2:4])[0]
        # 检查包长度是否合理
        if packet_len > MAX_PACKET_LEN:
            # 整个缓冲区数据无效,清空等待新数据
            packet_logger.log_text('Packet length %d exceeds max %d, clear buffer. Buffer data: %s' % (packet_len, MAX_PACKET_LEN, self.recv_buffer), 'WARN')
            self.recv_buffer = ''
            return False
        # 计算总长度(包含PacketHeader)
        total_len = PACKET_HEADER_SIZE + packet_len
        if len(self.recv_buffer) < total_len:
            # 数据不完整,等待更多数据
            return False
        # 提取协议数据(跳过4字节PacketHeader,只传Cmd/SubCmd及之后的数据)
        packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len]
        self.recv_buffer = self.recv_buffer[total_len:]
        # 交给包处理器处理
        self.packet_processor.add_packet(self.cid, packet_data)
        return True
    def send(self, data):
        """发送数据"""
        try:
            self.sock.sendall(data)
            return True
        except Exception as e:
            print('[Connection] Send error: %s' % str(e))
            return False
    def close(self):
        """关闭连接"""
        if self.running:
            self.running = False
        if self.sock:
            try:
                self.sock.close()
            except:
                pass
            self.sock = None
        # 通知客户端管理器
        ClientsMgr.instance().on_client_disconnect(self.cid)
class EventServer:
    """事件服务器"""
    def __init__(self):
        self.config = ConfigReader.instance()
        self.file_mgr = FileMgr(self.config)
        self.packet_processor = PacketProcessor(self.file_mgr)
        self.server_sock = None
        self.connections = {}  # cid -> Connection
        self.next_cid = 1
        self.lock = threading.Lock()
        self.running = False
        self.accept_thread = None
    def start(self):
        """启动服务器"""
        port = self.config.get_listen_port()
        # 创建服务器socket
        self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            self.server_sock.bind(('0.0.0.0', port))
            self.server_sock.listen(5)
            print('[EventServer] Server started on port %d' % port)
            print('[EventServer] Log path: %s' % self.config.get_log_file_path())
            print('[EventServer] Write mode: %d' % self.config.get_write_mode())
            print('')
            # 启动包处理器
            self.packet_processor.start()
            # 启动接收线程
            self.running = True
            self.accept_thread = threading.Thread(target=self._accept_loop)
            self.accept_thread.daemon = True
            self.accept_thread.start()
            return True
        except Exception as e:
            print('[EventServer] Failed to start server: %s' % str(e))
            self.server_sock.close()
            return False
    def _accept_loop(self):
        """接受连接循环"""
        while self.running:
            try:
                # 检查是否有新的连接
                readable, _, _ = select.select([self.server_sock], [], [], 1.0)
                if readable:
                    client_sock, addr = self.server_sock.accept()
                    with self.lock:
                        cid = self.next_cid
                        self.next_cid += 1
                    connection = Connection(client_sock, addr, cid, self.packet_processor)
                    with self.lock:
                        self.connections[cid] = connection
                    print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1]))
            except socket.error as e:
                if self.running:
                    print('[EventServer] Accept error: %s' % str(e))
                break
            except Exception as e:
                print('[EventServer] Error: %s' % str(e))
                break
    def get_connection_count(self):
        """获取连接数"""
        with self.lock:
            return len(self.connections)
    def get_queue_size(self):
        """获取包队列大小"""
        return self.packet_processor.get_queue_size()
    def get_success_count(self):
        """获取成功写入次数"""
        return self.file_mgr.get_success_count()
    def get_fail_count(self):
        """获取失败写入次数"""
        return self.file_mgr.get_fail_count()
    def stop(self):
        """停止服务器"""
        if not self.running:
            return
        print('[EventServer] Stopping server...')
        self.running = False
        # 关闭所有连接
        with self.lock:
            for conn in self.connections.values():
                conn.close()
            self.connections.clear()
        # 停止包处理器
        self.packet_processor.stop()
        # 关闭服务器socket
        if self.server_sock:
            try:
                self.server_sock.close()
            except:
                pass
        # 关闭所有文件
        self.file_mgr.close_all()
        print('[EventServer] Server stopped')