using System; using System.Threading; using System.Collections; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Text; using UnityEngine; namespace TcpServer { public interface ProtoFilter { void input(byte[] data); List swap_msgs(); } public class SimpleProtocolFilter : ProtoFilter { /* 简单协议过滤器 协议按照 [有效数据字节数][有效数据] 这种协议包的格式进行打包和解包 [有效数据字节数]长度HEADER_SIZE字节 [有效数据]长度有效数据字节数字节 本类按照这种方式,顺序从数据流中取出数据进行拼接,一旦接收完一个完整的协议包,就会将协议包返回 [有效数据]字段接收到后会按照utf-8进行解码,因为在传输过程中是用utf-8进行编码的 所有编解码的操作在该类中完成 */ private byte[] buf = new byte[0]; private int HEADER_SIZE = 4; private List msgs = new List(); public void input(byte[] data) { buf = Combine(buf, data); while (buf.Length > HEADER_SIZE) { int data_size = BitConverter.ToInt32(buf, 0); if (buf.Length >= data_size + HEADER_SIZE) { byte[] data_body = Slice(buf, HEADER_SIZE, data_size + HEADER_SIZE); string content = System.Text.Encoding.Default.GetString(data_body); msgs.Add(content); buf = Slice(buf, data_size + HEADER_SIZE, buf.Length); } else { break; } } } public List swap_msgs() { List ret = msgs; msgs = new List(); return ret; } public byte[] pack(String content) { int len = content.Length; byte[] size = BitConverter.GetBytes(len); if (!BitConverter.IsLittleEndian) { //reverse it so we get little endian. Array.Reverse(size); } byte[] body = System.Text.Encoding.Default.GetBytes(content); byte[] ret = Combine(size, body); return ret; } private static byte[] Combine(byte[] first, byte[] second) { byte[] ret = new byte[first.Length + second.Length]; Buffer.BlockCopy(first, 0, ret, 0, first.Length); Buffer.BlockCopy(second, 0, ret, first.Length, second.Length); return ret; } public byte[] Slice(byte[] source, int start, int end) { int length = end - start; byte[] ret = new byte[length]; Array.Copy(source, start, ret, 0, length); return ret; } } /// /// 异步TCP服务器 /// public class AsyncTcpServer : IDisposable { #region Fields private TcpListener _listener; private ConcurrentDictionary _clients; private bool _disposed = false; #endregion #region Ctors /// /// 异步TCP服务器 /// /// 监听的端口 public AsyncTcpServer(int listenPort) : this(IPAddress.Any, listenPort) { } /// /// 异步TCP服务器 /// /// 监听的终结点 public AsyncTcpServer(IPEndPoint localEP) : this(localEP.Address, localEP.Port) { } /// /// 异步TCP服务器 /// /// 监听的IP地址 /// 监听的端口 public AsyncTcpServer(IPAddress localIPAddress, int listenPort) { this.Address = localIPAddress; this.Port = listenPort; this.Encoding = Encoding.Default; _clients = new ConcurrentDictionary(); _listener = new TcpListener(Address, Port); // _listener.AllowNatTraversal(true); } #endregion #region Properties /// /// 服务器是否正在运行 /// public bool IsRunning { get; private set; } /// /// 监听的IP地址 /// public IPAddress Address { get; private set; } /// /// 监听的端口 /// public int Port { get; private set; } /// /// 通信使用的编码 /// public Encoding Encoding { get; set; } #endregion #region Server /// /// 启动服务器 /// /// 异步TCP服务器 public AsyncTcpServer Start() { Debug.Log("start server"); return Start(10); } /// /// 启动服务器 /// /// 服务器所允许的挂起连接序列的最大长度 /// 异步TCP服务器 public AsyncTcpServer Start(int backlog) { if (IsRunning) return this; IsRunning = true; _listener.Start(backlog); ContinueAcceptTcpClient(_listener); return this; } /// /// 停止服务器 /// /// 异步TCP服务器 public AsyncTcpServer Stop() { if (!IsRunning) return this; try { _listener.Stop(); foreach (var client in _clients.Values) { client.TcpClient.Client.Disconnect(false); } _clients.Clear(); } catch (ObjectDisposedException ex) { Debug.LogException(ex); } catch (SocketException ex) { Debug.LogException(ex); } IsRunning = false; return this; } private void ContinueAcceptTcpClient(TcpListener tcpListener) { try { tcpListener.BeginAcceptTcpClient(new AsyncCallback(HandleTcpClientAccepted), tcpListener); } catch (ObjectDisposedException ex) { Debug.LogException(ex); } catch (SocketException ex) { Debug.LogException(ex); } } #endregion #region Receive private void HandleTcpClientAccepted(IAsyncResult ar) { if (!IsRunning) return; TcpListener tcpListener = (TcpListener)ar.AsyncState; TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar); if (!tcpClient.Connected) return; byte[] buffer = new byte[tcpClient.ReceiveBufferSize]; SimpleProtocolFilter prot = new SimpleProtocolFilter(); TcpClientState internalClient = new TcpClientState(tcpClient, buffer, prot); // add client connection to cache string tcpClientKey = internalClient.TcpClient.Client.RemoteEndPoint.ToString(); _clients.AddOrUpdate(tcpClientKey, internalClient, (n, o) => { return internalClient; }); RaiseClientConnected(tcpClient); // begin to read data NetworkStream networkStream = internalClient.NetworkStream; ContinueReadBuffer(internalClient, networkStream); // keep listening to accept next connection ContinueAcceptTcpClient(tcpListener); } private void HandleDatagramReceived(IAsyncResult ar) { if (!IsRunning) return; try { TcpClientState internalClient = (TcpClientState)ar.AsyncState; if (!internalClient.TcpClient.Connected) return; NetworkStream networkStream = internalClient.NetworkStream; int numberOfReadBytes = 0; try { // if the remote host has shutdown its connection, // read will immediately return with zero bytes. numberOfReadBytes = networkStream.EndRead(ar); } catch (Exception ex) { Debug.LogException(ex); numberOfReadBytes = 0; } if (numberOfReadBytes == 0) { // connection has been closed TcpClientState internalClientToBeThrowAway; string tcpClientKey = internalClient.TcpClient.Client.RemoteEndPoint.ToString(); _clients.TryRemove(tcpClientKey, out internalClientToBeThrowAway); RaiseClientDisconnected(internalClient.TcpClient); return; } // received byte and trigger event notification var receivedBytes = new byte[numberOfReadBytes]; Array.Copy(internalClient.Buffer, 0, receivedBytes, 0, numberOfReadBytes); // input bytes into protofilter internalClient.Prot.input(receivedBytes); RaiseDatagramReceived(internalClient, receivedBytes); // RaisePlaintextReceived(internalClient.TcpClient, receivedBytes); // continue listening for tcp datagram packets ContinueReadBuffer(internalClient, networkStream); } catch (InvalidOperationException ex) { Debug.LogException(ex); } } private void ContinueReadBuffer(TcpClientState internalClient, NetworkStream networkStream) { try { networkStream.BeginRead(internalClient.Buffer, 0, internalClient.Buffer.Length, HandleDatagramReceived, internalClient); } catch (ObjectDisposedException ex) { Debug.LogException(ex); } } #endregion #region Events /// /// 接收到数据报文事件 /// public event EventHandler> DatagramReceived; /// /// 接收到数据报文明文事件 /// public event EventHandler> PlaintextReceived; private void RaiseDatagramReceived(TcpClientState sender, byte[] datagram) { if (DatagramReceived != null) { DatagramReceived(this, new TcpDatagramReceivedEventArgs(sender, datagram)); } } private void RaisePlaintextReceived(TcpClientState sender, byte[] datagram) { if (PlaintextReceived != null) { PlaintextReceived(this, new TcpDatagramReceivedEventArgs(sender, this.Encoding.GetString(datagram, 0, datagram.Length))); } } /// /// 与客户端的连接已建立事件 /// public event EventHandler ClientConnected; /// /// 与客户端的连接已断开事件 /// public event EventHandler ClientDisconnected; private void RaiseClientConnected(TcpClient tcpClient) { if (ClientConnected != null) { ClientConnected(this, new TcpClientConnectedEventArgs(tcpClient)); } } private void RaiseClientDisconnected(TcpClient tcpClient) { if (ClientDisconnected != null) { ClientDisconnected(this, new TcpClientDisconnectedEventArgs(tcpClient)); } } #endregion #region Send private void GuardRunning() { if (!IsRunning) throw new InvalidProgramException("This TCP server has not been started yet."); } /// /// 发送报文至指定的客户端 /// /// 客户端 /// 报文 public void Send(TcpClient tcpClient, byte[] datagram) { GuardRunning(); if (tcpClient == null) throw new ArgumentNullException("tcpClient"); if (datagram == null) throw new ArgumentNullException("datagram"); try { NetworkStream stream = tcpClient.GetStream(); if (stream.CanWrite) { stream.BeginWrite(datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient); } } catch (ObjectDisposedException ex) { Debug.LogException(ex); } } /// /// 发送报文至指定的客户端 /// /// 客户端 /// 报文 public void Send(TcpClient tcpClient, string datagram) { Send(tcpClient, this.Encoding.GetBytes(datagram)); } /// /// 发送报文至所有客户端 /// /// 报文 public void SendToAll(byte[] datagram) { GuardRunning(); foreach (var client in _clients.Values) { Send(client.TcpClient, datagram); } } /// /// 发送报文至所有客户端 /// /// 报文 public void SendToAll(string datagram) { GuardRunning(); SendToAll(this.Encoding.GetBytes(datagram)); } private void HandleDatagramWritten(IAsyncResult ar) { try { ((TcpClient)ar.AsyncState).GetStream().EndWrite(ar); } catch (ObjectDisposedException ex) { Debug.LogException(ex); } catch (InvalidOperationException ex) { Debug.LogException(ex); } catch (IOException ex) { Debug.LogException(ex); } } /// /// 发送报文至指定的客户端 /// /// 客户端 /// 报文 public void SyncSend(TcpClient tcpClient, byte[] datagram) { GuardRunning(); if (tcpClient == null) throw new ArgumentNullException("tcpClient"); if (datagram == null) throw new ArgumentNullException("datagram"); try { NetworkStream stream = tcpClient.GetStream(); if (stream.CanWrite) { stream.Write(datagram, 0, datagram.Length); } } catch (ObjectDisposedException ex) { Debug.LogException(ex); } } /// /// 发送报文至指定的客户端 /// /// 客户端 /// 报文 public void SyncSend(TcpClient tcpClient, string datagram) { SyncSend(tcpClient, this.Encoding.GetBytes(datagram)); } /// /// 发送报文至所有客户端 /// /// 报文 public void SyncSendToAll(byte[] datagram) { GuardRunning(); foreach (var client in _clients.Values) { SyncSend(client.TcpClient, datagram); } } /// /// 发送报文至所有客户端 /// /// 报文 public void SyncSendToAll(string datagram) { GuardRunning(); SyncSendToAll(this.Encoding.GetBytes(datagram)); } #endregion #region IDisposable Members /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources /// /// true to release both managed and unmanaged resources; /// false to release only unmanaged resources. protected virtual void Dispose(bool disposing) { if (!this._disposed) { if (disposing) { try { Stop(); if (_listener != null) { _listener = null; } } catch (SocketException ex) { Debug.LogException(ex); } } _disposed = true; } } #endregion } }