优化server端部分代码

This commit is contained in:
SpringHgui 2021-06-15 00:00:25 +08:00
parent 3d7ff85aee
commit 1a3662b407
17 changed files with 702 additions and 43 deletions

View File

@ -100,6 +100,7 @@ namespace FastTunnel.Core.Client
Close();
try
{
_logger.LogInformation("登录重试...");
_client = lastLogin.Invoke();
}
catch (Exception ex)

View File

@ -20,26 +20,31 @@ namespace FastTunnel.Core.Client
public readonly IServerConfig ServerSettings;
readonly ILogger _logger;
ClientListener client_listener;
ClientListenerV2 clientListener;
HttpListener http_listener;
public FastTunnelServer(ILogger<FastTunnelServer> logger, IConfiguration configuration)
{
_logger = logger;
ServerSettings = configuration.Get<AppSettings>().ServerSettings;
clientListener = new ClientListenerV2(this, ServerSettings.BindAddr, ServerSettings.BindPort, _logger);
http_listener = new HttpListener(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
clientListener.OnClientsChange += Client_listener_OnClientsChange;
}
public void Run()
{
_logger.LogInformation("===== FastTunnel Server Starting =====");
CheckSettins();
checkSettins();
ListenClient();
ListenHttp();
listenClient();
listenHttp();
}
private void CheckSettins()
private void checkSettins()
{
if (string.IsNullOrEmpty(ServerSettings.WebDomain))
{
@ -47,13 +52,14 @@ namespace FastTunnel.Core.Client
}
}
private void ListenClient()
private void listenClient()
{
client_listener = new ClientListener(this, ServerSettings.BindAddr, ServerSettings.BindPort, _logger);
client_listener.OnClientsChange += Client_listener_OnClientsChange;
client_listener.Start();
clientListener.Start();
}
_logger.LogInformation($"监听客户端 -> {ServerSettings.BindAddr}:{ServerSettings.BindPort}");
private void listenHttp()
{
http_listener.Start(new HttpDispatcher(this, _logger, ServerSettings));
}
private void Client_listener_OnClientsChange(System.Net.Sockets.Socket socket, int count, bool is_oofline)
@ -64,14 +70,6 @@ namespace FastTunnel.Core.Client
_logger.LogDebug($"客户端 {socket.RemoteEndPoint} 已连接,当前连接数:{count}");
}
private void ListenHttp()
{
http_listener = new HttpListener(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
http_listener.Start(new HttpDispatcher(this, _logger, ServerSettings));
_logger.LogInformation($"监听HTTP请求 -> {ServerSettings.BindAddr}:{ServerSettings.WebProxyPort}");
}
public void Stop(CancellationToken cancellationToken)
{
_logger.LogInformation("===== FastTunnel Server Stoping =====");

View File

@ -1,13 +1,16 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Text;
namespace FastTunnel.Core.Config
{
public class DefaultServerConfig : IServerConfig
{
[Required]
public string BindAddr { get; set; }
[Required]
public int BindPort { get; set; }
public string WebDomain { get; set; }

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Extensions
{
public static class ByteArrayExtension
{
public static string GetString(this byte[] buffer, int offset, int count)
{
return Encoding.UTF8.GetString(buffer, offset, count);
}
}
}

View File

@ -42,6 +42,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="NLog" Version="4.7.10" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.2" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
<PackageReference Include="System.Private.ServiceModel" Version="4.8.1" />
</ItemGroup>

View File

@ -85,7 +85,7 @@ namespace FastTunnel.Core.Handlers
foreach (var www in item.WWW)
{
// TODO:validateDomain
_logger.LogError($"WWW {www}");
_logger.LogInformation($"WWW {www}");
server.WebList.AddOrUpdate(www, info, (key, oldInfo) => { return info; });
sb.Append($"{Environment.NewLine} http://{www}{(server.ServerSettings.WebHasNginxProxy ? string.Empty : ":" + server.ServerSettings.WebProxyPort)} => {item.LocalIp}:{item.LocalPort}");

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Helper
{
public static class ValidateHelper
{
public static string[] ValidateObject(object instance)
{
throw new NotImplementedException();
}
}
}

View File

@ -13,7 +13,7 @@ namespace FastTunnel.Core.Listener
{
public class ClientListener : IListener
{
ILogger _logerr;
ILogger _logger;
public string ListenIp { get; set; }
@ -22,7 +22,6 @@ namespace FastTunnel.Core.Listener
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
//IListenerDispatcher _requestDispatcher;
Socket listenSocket;
public IList<ClientConnection> ConnectedSockets = new List<ClientConnection>();
FastTunnelServer _fastTunnelServer;
@ -30,7 +29,7 @@ namespace FastTunnel.Core.Listener
public ClientListener(FastTunnelServer fastTunnelServer, string ip, int port, ILogger logerr)
{
_fastTunnelServer = fastTunnelServer;
_logerr = logerr;
_logger = logerr;
this.ListenIp = ip;
this.ListenPort = port;
@ -44,21 +43,22 @@ namespace FastTunnel.Core.Listener
private void HandleNewClient(Socket socket)
{
// 此时的客户端可能有两种 1.登录的客户端 2.交换请求的客户端
var client = new ClientConnection(_fastTunnelServer, socket, _logerr);
var client = new ClientConnection(_fastTunnelServer, socket, _logger);
ConnectedSockets.Add(client);
// 接收客户端消息
client.StartRecive();
}
public void Start(IListenerDispatcher requestDispatcher, int backlog = 100)
public void Start(int backlog = 100)
{
shutdown = false;
// _requestDispatcher = requestDispatcher;
listenSocket.Listen(backlog);
StartAccept(null);
_logger.LogInformation($"监听客户端 -> {ListenIp}:{ListenPort}");
}
public void Stop()
@ -85,7 +85,7 @@ namespace FastTunnel.Core.Listener
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
_logger.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
@ -120,7 +120,7 @@ namespace FastTunnel.Core.Listener
}
else
{
_logerr.LogError($"监听客户端异常 this={this.ToJson()} e={e.ToJson()}");
_logger.LogError($"监听客户端异常 this={this.ToJson()} e={e.ToJson()}");
Stop();
}
}
@ -128,10 +128,5 @@ namespace FastTunnel.Core.Listener
public void Close()
{
}
public void Start(int backlog = 100)
{
Start(null, backlog);
}
}
}

View File

@ -0,0 +1,105 @@
using FastTunnel.Core.Client;
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Models;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace FastTunnel.Core.Listener
{
public class ClientListenerV2 : IListener
{
ILogger _logger;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
Socket listenSocket;
public IList<ClientConnection> ConnectedSockets = new List<ClientConnection>();
FastTunnelServer _fastTunnelServer;
Server.Server server;
readonly LoginHandler _loginHandler;
readonly HeartMessageHandler _heartHandler;
readonly SwapMessageHandler _swapMsgHandler;
public ClientListenerV2(FastTunnelServer fastTunnelServer, string ip, int port, ILogger logerr)
{
_fastTunnelServer = fastTunnelServer;
_logger = logerr;
this.ListenIp = ip;
this.ListenPort = port;
_loginHandler = new LoginHandler(_logger);
_heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(1000, 1024);
}
public void Start(int backlog = 100)
{
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
server.Init();
server.Start(localEndPoint, handle);
_logger.LogInformation($"监听客户端 -> {ListenIp}:{ListenPort}");
}
private bool handle(Socket client, string words)
{
Message<JObject> msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
IClientMessageHandler handler = null;
switch (msg.MessageType)
{
case MessageType.C_LogIn: // 登录
handler = _loginHandler;
break;
case MessageType.Heart: // 心跳
handler = _heartHandler;
break;
case MessageType.C_SwapMsg: // 交换数据
handler = _swapMsgHandler;
break;
default:
throw new Exception($"未知的通讯指令 {msg.MessageType}");
}
handler.HandlerMsg(this._fastTunnelServer, client, msg);
return handler.NeedRecive;
}
public void Stop()
{
}
private void HandleNewClient(Socket socket)
{
// 此时的客户端可能有两种 1.登录的客户端 2.交换请求的客户端
var client = new ClientConnection(_fastTunnelServer, socket, _logger);
ConnectedSockets.Add(client);
// 接收客户端消息
client.StartRecive();
}
public void Close()
{
}
}
}

View File

@ -11,7 +11,7 @@ namespace FastTunnel.Core.Listener
{
public class HttpListener : IListener
{
ILogger _logerr;
ILogger _logger;
public string ListenIp { get; set; }
@ -26,9 +26,9 @@ namespace FastTunnel.Core.Listener
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
public HttpListener(string ip, int port, ILogger logerr)
public HttpListener(string ip, int port, ILogger logger)
{
_logerr = logerr;
_logger = logger;
this.ListenIp = ip;
this.ListenPort = port;
@ -59,6 +59,7 @@ namespace FastTunnel.Core.Listener
listenSocket.Listen(backlog);
StartAccept(null);
_logger.LogInformation($"监听HTTP请求 -> {ListenIp}:{ListenPort}");
}
public void Stop()
@ -86,7 +87,7 @@ namespace FastTunnel.Core.Listener
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
_logger.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
@ -114,7 +115,7 @@ namespace FastTunnel.Core.Listener
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
_logger.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// Accept the next connection request

View File

@ -15,8 +15,6 @@ namespace FastTunnel.Core.Listener
int ListenPort { get; }
void Start(IListenerDispatcher requestDispatcher, int backlog = 100);
void Start(int backlog = 100);
void Stop();

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Server
{
public class AsyncUserToken
{
public Socket Socket { get; set; }
public string MassgeTemp { get; set; }
}
}

View File

@ -0,0 +1,69 @@
// This class creates a single large buffer which can be divided up
// and assigned to SocketAsyncEventArgs objects for use with each
// socket I/O operation.
// This enables bufffers to be easily reused and guards against
// fragmenting heap memory.
//
// The operations exposed on the BufferManager class are not thread safe.
using System.Collections.Generic;
using System.Net.Sockets;
namespace FastTunnel.Core.Server
{
class BufferManager
{
int m_numBytes; // the total number of bytes controlled by the buffer pool
byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager
Stack<int> m_freeIndexPool; //
int m_currentIndex;
int m_bufferSize;
public BufferManager(int totalBytes, int bufferSize)
{
m_numBytes = totalBytes;
m_currentIndex = 0;
m_bufferSize = bufferSize;
m_freeIndexPool = new Stack<int>();
}
// Allocates buffer space used by the buffer pool
public void InitBuffer()
{
// create one big large buffer and divide that
// out to each SocketAsyncEventArg object
m_buffer = new byte[m_numBytes];
}
// Assigns a buffer from the buffer pool to the
// specified SocketAsyncEventArgs object
//
// <returns>true if the buffer was successfully set, else false</returns>
public bool SetBuffer(SocketAsyncEventArgs args)
{
if (m_freeIndexPool.Count > 0)
{
args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
}
else
{
if ((m_numBytes - m_bufferSize) < m_currentIndex)
{
return false;
}
args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
m_currentIndex += m_bufferSize;
}
return true;
}
// Removes the buffer from a SocketAsyncEventArg object.
// This frees the buffer back to the buffer pool
public void FreeBuffer(SocketAsyncEventArgs args)
{
m_freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
}
}

View File

@ -0,0 +1,116 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Server
{
public class PipeHepler
{
Socket m_socket;
Func<Socket, byte[], bool> processLine;
public PipeHepler(Socket socket, Func<Socket, byte[], bool> processLine)
{
this.processLine = processLine;
m_socket = socket;
}
public async Task ProcessLinesAsync()
{
var pipe = new Pipe();
Task writing = FillPipeAsync(pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
public async Task FillPipeAsync(PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await m_socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
public async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
if (!processLine(m_socket, line.ToArray()))
{
// 停止继续监听
}
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}
}

View File

@ -0,0 +1,275 @@
// Implements the connection logic for the socket server.
// After accepting a connection, all data read from the client
// is sent back to the client. The read and echo back to the client pattern
// is continued until the client disconnects.
using FastTunnel.Core.Extensions;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace FastTunnel.Core.Server
{
public class Server
{
private int m_numConnections; // the maximum number of connections the sample is designed to handle simultaneously
private int m_receiveBufferSize;// buffer size to use for each socket I/O operation
BufferManager m_bufferManager; // represents a large reusable set of buffers for all socket operations
const int opsToPreAlloc = 2; // read, write (don't alloc buffer space for accepts)
Socket listenSocket; // the socket used to listen for incoming connection requests
// pool of reusable SocketAsyncEventArgs objects for write, read and accept socket operations
SocketAsyncEventArgsPool m_readWritePool;
//int m_totalBytesRead; // counter of the total # bytes received by the server
int m_numConnectedSockets; // the total number of clients connected to the server
Semaphore m_maxNumberAcceptedClients;
Func<Socket, string, bool> m_handller;
// Create an uninitialized server instance.
// To start the server listening for connection requests
// call the Init method followed by Start method
//
// <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
// <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
public Server(int numConnections, int receiveBufferSize)
{
//m_totalBytesRead = 0;
m_numConnectedSockets = 0;
m_numConnections = numConnections;
m_receiveBufferSize = receiveBufferSize;
// allocate buffers such that the maximum number of sockets can have one outstanding read and
//write posted to the socket simultaneously
m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToPreAlloc,
receiveBufferSize);
m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
}
// Initializes the server by preallocating reusable buffers and
// context objects. These objects do not need to be preallocated
// or reused, but it is done this way to illustrate how the API can
// easily be used to create reusable objects to increase server performance.
//
public void Init()
{
// Allocates one large byte buffer which all I/O operations use a piece of. This gaurds
// against memory fragmentation
m_bufferManager.InitBuffer();
// preallocate pool of SocketAsyncEventArgs objects
SocketAsyncEventArgs readWriteEventArg;
for (int i = 0; i < m_numConnections; i++)
{
//Pre-allocate a set of reusable SocketAsyncEventArgs
readWriteEventArg = new SocketAsyncEventArgs();
readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
readWriteEventArg.UserToken = new AsyncUserToken();
// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
m_bufferManager.SetBuffer(readWriteEventArg);
// add SocketAsyncEventArg to the pool
m_readWritePool.Push(readWriteEventArg);
}
}
// Starts the server such that it is listening for
// incoming connection requests.
//
// <param name="localEndPoint">The endpoint which the server will listening
// for connection requests on</param>
public void Start(IPEndPoint localEndPoint, Func<Socket, string, bool> handller)
{
m_handller = handller;
// create the socket which listens for incoming connections
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
// start the server with a listen backlog of 100 connections
listenSocket.Listen(100);
// post accepts on the listening socket
StartAccept(null);
//Console.WriteLine("{0} connected sockets with one outstanding receive posted to each....press any key", m_outstandingReadCount);
}
// Begins an operation to accept a connection request from the client
//
// <param name="acceptEventArg">The context object to use when issuing
// the accept operation on the server's listening socket</param>
public void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
m_maxNumberAcceptedClients.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
// This method is the callback method associated with Socket.AcceptAsync
// operations and is invoked when an accept operation is complete
//
void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
Interlocked.Increment(ref m_numConnectedSockets);
Console.WriteLine("Client connection accepted. There are {0} clients connected to the server",
m_numConnectedSockets);
// new PipeHepler(e.AcceptSocket, processLine).ProcessLinesAsync();
// Get the socket for the accepted client connection and put it into the
//ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).MassgeTemp = null;
// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(readEventArgs);
}
// Accept the next connection request
StartAccept(e);
}
private bool processLine(Socket socket, byte[] line)
{
return m_handller(socket, Encoding.UTF8.GetString(line));
}
// This method is called whenever a receive or send operation is completed on a socket
//
// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
// This method is invoked when an asynchronous receive operation completes.
// If the remote host closed the connection, then the socket is closed.
// If data was received then the data is echoed back to the client.
//
private void ProcessReceive(SocketAsyncEventArgs e)
{
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
//increment the count of the total bytes receive by the server
//Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
var words = e.Buffer.GetString(e.Offset, e.BytesTransferred);
if (words.IndexOf("\n") != -1)
{
var array = words.Split("\n");
// handle msg
var msg = token.MassgeTemp + array[0];
var needRecive = m_handller(token.Socket, msg);
token.MassgeTemp = array.Length > 1 ? array[1] : null;
if (!needRecive)
{
return;
}
}
else
{
token.MassgeTemp += words;
}
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
}
else
{
CloseClientSocket(e);
}
}
// This method is invoked when an asynchronous send operation completes.
// The method issues another receive on the socket to read any additional
// data sent from the client
//
// <param name="e"></param>
private void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
// done echoing data back to the client
AsyncUserToken token = (AsyncUserToken)e.UserToken;
// read the next block of data send from the client
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
}
else
{
CloseClientSocket(e);
}
}
private void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
// close the socket associated with the client
try
{
token.Socket.Shutdown(SocketShutdown.Send);
}
// throws if client process has already closed
catch (Exception) { }
token.Socket.Close();
// decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref m_numConnectedSockets);
// Free the SocketAsyncEventArg so they can be reused by another client
m_readWritePool.Push(e);
m_maxNumberAcceptedClients.Release();
Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
}
}
}

View File

@ -0,0 +1,49 @@
// Represents a collection of reusable SocketAsyncEventArgs objects.
using System;
using System.Collections.Generic;
using System.Net.Sockets;
class SocketAsyncEventArgsPool
{
Stack<SocketAsyncEventArgs> m_pool;
// Initializes the object pool to the specified size
//
// The "capacity" parameter is the maximum number of
// SocketAsyncEventArgs objects the pool can hold
public SocketAsyncEventArgsPool(int capacity)
{
m_pool = new Stack<SocketAsyncEventArgs>(capacity);
}
// Add a SocketAsyncEventArg instance to the pool
//
//The "item" parameter is the SocketAsyncEventArgs instance
// to add to the pool
public void Push(SocketAsyncEventArgs item)
{
if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
lock (m_pool)
{
m_pool.Push(item);
}
}
// Removes a SocketAsyncEventArgs instance from the pool
// and returns the object removed from the pool
public SocketAsyncEventArgs Pop()
{
lock (m_pool)
{
return m_pool.Pop();
}
}
// The number of SocketAsyncEventArgs instances in the pool
public int Count
{
get { return m_pool.Count; }
}
}

View File

@ -21,7 +21,7 @@ namespace FastTunnel.Core.Services
{
readonly ILogger<ServiceFastTunnelServer> _logger;
readonly IAuthenticationFilter _authenticationFilter;
FastTunnelServer _Server;
FastTunnelServer _server;
public ServiceFastTunnelServer(
ILogger<ServiceFastTunnelServer> logger,
@ -30,7 +30,7 @@ namespace FastTunnel.Core.Services
{
_logger = logger;
_authenticationFilter = authenticationFilter;
_Server = fastTunnelServer;
_server = fastTunnelServer;
AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;
AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;
}
@ -72,7 +72,7 @@ namespace FastTunnel.Core.Services
try
{
_Server.Run();
_server.Run();
}
catch (Exception ex)
{
@ -88,7 +88,7 @@ namespace FastTunnel.Core.Services
{
return Task.Run(() =>
{
_Server.Stop(cancellationToken);
_server.Stop(cancellationToken);
}, cancellationToken);
}
}