#!/usr/bin/python
|
# -*- coding: GBK -*-
|
|
"""
|
EventServerÖ÷·þÎñÆ÷Ä£¿é
|
ʵÏÖTCP·þÎñÆ÷,½ÓÊÕÓÎÏ·¿Í»§¶Ëʼþ²¢¼Ç¼µ½±¾µØÎļþ
|
"""
|
|
import sys
|
import socket
|
import select
|
import threading
|
import datetime
|
from config import ConfigReader
|
from clients_manager import ClientsMgr
|
from file_manager import FileMgr
|
from packet_processor import PacketProcessor
|
from packet_logger import packet_logger
|
|
|
class Connection:
|
"""Á¬½Ó·â×°Àà"""
|
|
def __init__(self, sock, addr, cid, packet_processor):
|
self.sock = sock
|
self.addr = addr
|
self.cid = cid
|
self.packet_processor = packet_processor
|
self.running = True
|
self.recv_buffer = ''
|
self.lock = threading.Lock()
|
self.consecutive_discards = 0 # Á¬Ðø¶ªÆú¼ÆÊýÆ÷
|
|
# ´´½¨½ÓÊÕÏß³Ì
|
self.recv_thread = threading.Thread(target=self._recv_loop)
|
self.recv_thread.daemon = True
|
self.recv_thread.start()
|
|
def _recv_loop(self):
|
"""½ÓÊÕÊý¾ÝÑ»·"""
|
while self.running:
|
try:
|
# ¼ì²ésocketÊÇ·ñÓÐÊý¾Ý¿É¶Á
|
readable, _, _ = select.select([self.sock], [], [], 1.0)
|
|
if readable:
|
# ½ÓÊÕ»º³åÇø´óС: ½¨ÒéÖÁÉÙÊÇ×î´ó°ü³¤¶ÈµÄ2±¶,±ÜÃâÆµ·±recvµ÷ÓÃ
|
# ×î´ó°ü = 4(header) + 8192(payload) = 8196 ×Ö½Ú
|
# ʹÓÃ32768¿ÉÒÔÈÝÄÉ3-4¸öÍêÕû°ü,Ìá¸ßЧÂÊ
|
data = self.sock.recv(32768)
|
|
if not data:
|
# Á¬½Ó¶Ï¿ª
|
break
|
|
# ÀÛ»ýÊý¾Ýµ½»º³åÇø
|
with self.lock:
|
self.recv_buffer += data
|
|
# ³¢ÊÔ½âÎöÍêÕûµÄÊý¾Ý°ü
|
while self._try_parse_packet():
|
pass
|
|
except socket.error as e:
|
print('[Connection] Socket error: %s' % str(e))
|
break
|
except Exception as e:
|
print('[Connection] Error: %s' % str(e))
|
break
|
|
# ¼ì²éÊÇ·ñÒò´óÁ¿ÎÞЧ°üÐèÒª¹Ø±Õ
|
if self.consecutive_discards > 1000:
|
print('[Connection] Too many invalid packets, closing connection %d' % self.cid)
|
|
self.close()
|
|
def _try_parse_packet(self):
|
"""³¢ÊÔ½âÎöÊý¾Ý°ü"""
|
import struct
|
|
# PacketHeader: FF CC (2×Ö½Ú) + Length (2×Ö½Ú) = 4×Ö½Ú
|
PACKET_HEADER_SIZE = 4
|
PACKET_HEADER_MC = 0xCCFF
|
MAX_PACKET_LEN = 8192
|
|
if len(self.recv_buffer) < PACKET_HEADER_SIZE:
|
return False
|
|
# ¼ì²é MagicCode (FF CC)
|
magic_code = struct.unpack('<H', self.recv_buffer[0:2])[0]
|
if magic_code != PACKET_HEADER_MC:
|
# ¶ªÆúµÚÒ»¸ö×Ö½Ú£¬¼ÌÐø³¢ÊÔ
|
self.consecutive_discards += 1
|
if self.consecutive_discards > 1000:
|
packet_logger.log_text('Too many invalid packets, closing connection', 'ERROR')
|
return False
|
packet_logger.log_text('Invalid magic code: 0x%04X, discard byte (count: %d)' % (magic_code, self.consecutive_discards), 'WARN')
|
self.recv_buffer = self.recv_buffer[1:]
|
return False
|
|
# ¶ÁÈ¡°ü³¤¶È£¨²»°üº¬PacketHeader±¾Éí£©
|
packet_len = struct.unpack('<H', self.recv_buffer[2:4])[0]
|
|
# ¼ì²é°ü³¤¶ÈÊÇ·ñºÏÀí
|
if packet_len > MAX_PACKET_LEN:
|
self.consecutive_discards += 1
|
packet_logger.log_text('Packet length %d exceeds max %d, discard byte (count: %d)' % (packet_len, MAX_PACKET_LEN, self.consecutive_discards), 'WARN')
|
self.recv_buffer = self.recv_buffer[1:]
|
return False
|
|
# ¼ÆËã×ܳ¤¶È£¨°üº¬PacketHeader£©
|
total_len = PACKET_HEADER_SIZE + packet_len
|
|
if len(self.recv_buffer) < total_len:
|
# Êý¾Ý²»ÍêÕû£¬µÈ´ý¸ü¶àÊý¾Ý
|
return False
|
|
# ÌáÈ¡ÐÒéÊý¾Ý£¨Ìø¹ý4×Ö½ÚPacketHeader£¬Ö»´«Cmd/SubCmd¼°Ö®ºóµÄÊý¾Ý£©
|
packet_data = self.recv_buffer[PACKET_HEADER_SIZE:total_len]
|
self.recv_buffer = self.recv_buffer[total_len:]
|
|
# ÖØÖöªÆú¼ÆÊýÆ÷
|
self.consecutive_discards = 0
|
|
# ½»¸ø°ü´¦ÀíÆ÷´¦Àí
|
self.packet_processor.add_packet(self.cid, packet_data)
|
|
return True
|
|
def send(self, data):
|
"""·¢ËÍÊý¾Ý"""
|
try:
|
self.sock.sendall(data)
|
return True
|
except Exception as e:
|
print('[Connection] Send error: %s' % str(e))
|
return False
|
|
def close(self):
|
"""¹Ø±ÕÁ¬½Ó"""
|
if self.running:
|
self.running = False
|
|
if self.sock:
|
try:
|
self.sock.close()
|
except:
|
pass
|
self.sock = None
|
|
# ֪ͨ¿Í»§¶Ë¹ÜÀíÆ÷
|
ClientsMgr.instance().on_client_disconnect(self.cid)
|
|
|
class EventServer:
|
"""ʼþ·þÎñÆ÷"""
|
|
def __init__(self):
|
self.config = ConfigReader.instance()
|
self.file_mgr = FileMgr(self.config)
|
self.packet_processor = PacketProcessor(self.file_mgr)
|
|
self.server_sock = None
|
self.connections = {} # cid -> Connection
|
self.next_cid = 1
|
self.lock = threading.Lock()
|
self.running = False
|
self.accept_thread = None
|
|
def start(self):
|
"""Æô¶¯·þÎñÆ÷"""
|
port = self.config.get_listen_port()
|
|
# ´´½¨·þÎñÆ÷socket
|
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
try:
|
self.server_sock.bind(('0.0.0.0', port))
|
self.server_sock.listen(5)
|
|
print('[EventServer] Server started on port %d' % port)
|
print('[EventServer] Log path: %s' % self.config.get_log_file_path())
|
print('[EventServer] Write mode: %d' % self.config.get_write_mode())
|
print('')
|
|
# Æô¶¯°ü´¦ÀíÆ÷
|
self.packet_processor.start()
|
|
# Æô¶¯½ÓÊÕÏß³Ì
|
self.running = True
|
self.accept_thread = threading.Thread(target=self._accept_loop)
|
self.accept_thread.daemon = True
|
self.accept_thread.start()
|
|
return True
|
|
except Exception as e:
|
print('[EventServer] Failed to start server: %s' % str(e))
|
self.server_sock.close()
|
return False
|
|
def _accept_loop(self):
|
"""½ÓÊÜÁ¬½ÓÑ»·"""
|
while self.running:
|
try:
|
# ¼ì²éÊÇ·ñÓÐеÄÁ¬½Ó
|
readable, _, _ = select.select([self.server_sock], [], [], 1.0)
|
|
if readable:
|
client_sock, addr = self.server_sock.accept()
|
|
with self.lock:
|
cid = self.next_cid
|
self.next_cid += 1
|
|
connection = Connection(client_sock, addr, cid, self.packet_processor)
|
|
with self.lock:
|
self.connections[cid] = connection
|
|
print('[EventServer] New connection %d from %s:%d' % (cid, addr[0], addr[1]))
|
|
except socket.error as e:
|
if self.running:
|
print('[EventServer] Accept error: %s' % str(e))
|
break
|
except Exception as e:
|
print('[EventServer] Error: %s' % str(e))
|
break
|
|
def get_connection_count(self):
|
"""»ñÈ¡Á¬½ÓÊý"""
|
with self.lock:
|
return len(self.connections)
|
|
def get_queue_size(self):
|
"""»ñÈ¡°ü¶ÓÁдóС"""
|
return self.packet_processor.get_queue_size()
|
|
def get_success_count(self):
|
"""»ñÈ¡³É¹¦Ð´Èë´ÎÊý"""
|
return self.file_mgr.get_success_count()
|
|
def get_fail_count(self):
|
"""»ñȡʧ°ÜдÈë´ÎÊý"""
|
return self.file_mgr.get_fail_count()
|
|
def stop(self):
|
"""Í£Ö¹·þÎñÆ÷"""
|
if not self.running:
|
return
|
|
print('[EventServer] Stopping server...')
|
self.running = False
|
|
# ¹Ø±ÕËùÓÐÁ¬½Ó
|
with self.lock:
|
for conn in self.connections.values():
|
conn.close()
|
self.connections.clear()
|
|
# Í£Ö¹°ü´¦ÀíÆ÷
|
self.packet_processor.stop()
|
|
# ¹Ø±Õ·þÎñÆ÷socket
|
if self.server_sock:
|
try:
|
self.server_sock.close()
|
except:
|
pass
|
|
# ¹Ø±ÕËùÓÐÎļþ
|
self.file_mgr.close_all()
|
|
print('[EventServer] Server stopped')
|