From ab57c59ab33f5e4bf7e3bfd5514b04cb3d38dc2b Mon Sep 17 00:00:00 2001
From: hxp <ale99527@vip.qq.com>
Date: 星期三, 04 二月 2026 18:19:05 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.1.20:10010/r/Project_SG_ServerCode
---
ServerPython/EventServerPY/server.py | 264 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 264 insertions(+), 0 deletions(-)
diff --git a/ServerPython/EventServerPY/server.py b/ServerPython/EventServerPY/server.py
new file mode 100644
index 0000000..1643324
--- /dev/null
+++ b/ServerPython/EventServerPY/server.py
@@ -0,0 +1,264 @@
+#!/usr/bin/python
+# -*- coding: GBK -*-
+
+"""
+EventServer主服务器模块
+实现TCP服务器,接收游戏客户端事件并记录到本地文件
+"""
+
+import sys
+import socket
+import select
+import threading
+import datetime
+from config import ConfigReader
+from clients_manager import ClientsMgr
+from file_manager import FileMgr
+from packet_processor import PacketProcessor
+from packet_logger import packet_logger
+
+
+class Connection:
+ """连接封装类"""
+
+ def __init__(self, sock, addr, cid, packet_processor):
+ self.sock = sock
+ self.addr = addr
+ self.cid = cid
+ self.packet_processor = packet_processor
+ self.running = True
+ self.recv_buffer = ''
+ self.lock = threading.Lock()
+
+ # 创建接收线程
+ self.recv_thread = threading.Thread(target=self._recv_loop)
+ self.recv_thread.daemon = True
+ self.recv_thread.start()
+
+ def _recv_loop(self):
+ """接收数据循环"""
+ while self.running:
+ try:
+ # 检查socket是否有数据可读
+ readable, _, _ = select.select([self.sock], [], [], 1.0)
+
+ if readable:
+ # 接收缓冲区大小: 建议至少是最大包长度的2倍,避免频繁recv调用
+ # 最大包 = 4(header) + 8192(payload) = 8196 字节
+ # 使用32768可以容纳3-4个完整包,提高效率
+ data = self.sock.recv(32768)
+
+ if not data:
+ # 连接断开
+ break
+
+ # 累积数据到缓冲区
+ with self.lock:
+ self.recv_buffer += data
+
+ # 尝试解析完整的数据包
+ while self._try_parse_packet():
+ pass
+
+ except socket.error as e:
+ print('[Connection] Socket error: %s' % str(e))
+ break
+ except Exception as e:
+ print('[Connection] Error: %s' % str(e))
+ break
+
+ self.close()
+
+ def _try_parse_packet(self):
+ """尝试解析数据包"""
+ import struct
+
+ # PacketHeader: FF CC (2字节) + Length (2字节) = 4字节
+ PACKET_HEADER_SIZE = 4
+ PACKET_HEADER_MC = 0xCCFF
+ MAX_PACKET_LEN = 8192
+
+ if len(self.recv_buffer) < PACKET_HEADER_SIZE:
+ return False
+
+ # 检查 MagicCode (FF CC)
+ magic_code = struct.unpack('<H', self.recv_buffer[0:2])[0]
+ if magic_code != PACKET_HEADER_MC:
+ # 整个缓冲区数据无效,清空等待新数据
+ packet_logger.log_text('Invalid magic code: 0x%04X, clear buffer data: %s' % (magic_code, self.recv_buffer), 'WARN')
+ self.recv_buffer = ''
+ return False
+
+ # 读取包长度(不包含PacketHeader本身)
+ packet_len = struct.unpack('<H', self.recv_buffer[2:4])[0]
+
+ # 检查包长度是否合理
+ if packet_len > MAX_PACKET_LEN:
+ # 整个缓冲区数据无效,清空等待新数据
+ packet_logger.log_text('Packet length %d exceeds max %d, clear buffer. Buffer data: %s' % (packet_len, MAX_PACKET_LEN, self.recv_buffer), 'WARN')
+ self.recv_buffer = ''
+ return False
+
+ # 计算总长度(包含PacketHeader)
+ total_len = PACKET_HEADER_SIZE + packet_len
+
+ if len(self.recv_buffer) < total_len:
+ # 数据不完整,等待更多数据
+ return False
+
+ # 提取协议数据(跳过4字节PacketHeader,只传Cmd/SubCmd及之后的数据)
+ packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len]
+ self.recv_buffer = self.recv_buffer[total_len:]
+
+ # 交给包处理器处理
+ self.packet_processor.add_packet(self.cid, packet_data)
+
+ return True
+
+ def send(self, data):
+ """发送数据"""
+ try:
+ self.sock.sendall(data)
+ return True
+ except Exception as e:
+ print('[Connection] Send error: %s' % str(e))
+ return False
+
+ def close(self):
+ """关闭连接"""
+ if self.running:
+ self.running = False
+
+ if self.sock:
+ try:
+ self.sock.close()
+ except:
+ pass
+ self.sock = None
+
+ # 通知客户端管理器
+ ClientsMgr.instance().on_client_disconnect(self.cid)
+
+
+class EventServer:
+ """事件服务器"""
+
+ def __init__(self):
+ self.config = ConfigReader.instance()
+ self.file_mgr = FileMgr(self.config)
+ self.packet_processor = PacketProcessor(self.file_mgr)
+
+ self.server_sock = None
+ self.connections = {} # cid -> Connection
+ self.next_cid = 1
+ self.lock = threading.Lock()
+ self.running = False
+ self.accept_thread = None
+
+ def start(self):
+ """启动服务器"""
+ port = self.config.get_listen_port()
+
+ # 创建服务器socket
+ self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ try:
+ self.server_sock.bind(('0.0.0.0', port))
+ self.server_sock.listen(5)
+
+ print('[EventServer] Server started on port %d' % port)
+ print('[EventServer] Log path: %s' % self.config.get_log_file_path())
+ print('[EventServer] Write mode: %d' % self.config.get_write_mode())
+ print('')
+
+ # 启动包处理器
+ self.packet_processor.start()
+
+ # 启动接收线程
+ self.running = True
+ self.accept_thread = threading.Thread(target=self._accept_loop)
+ self.accept_thread.daemon = True
+ self.accept_thread.start()
+
+ return True
+
+ except Exception as e:
+ print('[EventServer] Failed to start server: %s' % str(e))
+ self.server_sock.close()
+ return False
+
+ def _accept_loop(self):
+ """接受连接循环"""
+ while self.running:
+ try:
+ # 检查是否有新的连接
+ readable, _, _ = select.select([self.server_sock], [], [], 1.0)
+
+ if readable:
+ client_sock, addr = self.server_sock.accept()
+
+ with self.lock:
+ cid = self.next_cid
+ self.next_cid += 1
+
+ connection = Connection(client_sock, addr, cid, self.packet_processor)
+
+ with self.lock:
+ self.connections[cid] = connection
+
+ print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1]))
+
+ except socket.error as e:
+ if self.running:
+ print('[EventServer] Accept error: %s' % str(e))
+ break
+ except Exception as e:
+ print('[EventServer] Error: %s' % str(e))
+ break
+
+ def get_connection_count(self):
+ """获取连接数"""
+ with self.lock:
+ return len(self.connections)
+
+ def get_queue_size(self):
+ """获取包队列大小"""
+ return self.packet_processor.get_queue_size()
+
+ def get_success_count(self):
+ """获取成功写入次数"""
+ return self.file_mgr.get_success_count()
+
+ def get_fail_count(self):
+ """获取失败写入次数"""
+ return self.file_mgr.get_fail_count()
+
+ def stop(self):
+ """停止服务器"""
+ if not self.running:
+ return
+
+ print('[EventServer] Stopping server...')
+ self.running = False
+
+ # 关闭所有连接
+ with self.lock:
+ for conn in self.connections.values():
+ conn.close()
+ self.connections.clear()
+
+ # 停止包处理器
+ self.packet_processor.stop()
+
+ # 关闭服务器socket
+ if self.server_sock:
+ try:
+ self.server_sock.close()
+ except:
+ pass
+
+ # 关闭所有文件
+ self.file_mgr.close_all()
+
+ print('[EventServer] Server stopped')
--
Gitblit v1.8.0