增加客户端连接变化的监测

This commit is contained in:
SpringHgui 2020-10-30 00:12:16 +08:00
parent 92ba5cc825
commit 3981c89cd1
9 changed files with 83 additions and 13 deletions

View File

@ -1,6 +1,8 @@
using FastTunnel.Core.Handlers.Server;
using Microsoft.Extensions.Logging;
using Microsoft.VisualBasic;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
@ -10,6 +12,8 @@ using System.Threading.Tasks;
namespace FastTunnel.Core
{
public delegate void OnClientChangeLine(Socket socket, int count, bool is_offline);
public class AsyncListener : IListener
{
ILogger _logerr;
@ -20,9 +24,12 @@ namespace FastTunnel.Core
int m_numConnectedSockets;
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
IListenerDispatcher _requestDispatcher;
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
public AsyncListener(string ip, int port, ILogger logerr)
{
@ -37,6 +44,18 @@ namespace FastTunnel.Core
listenSocket.Bind(localEndPoint);
}
private void OnOffLine(Socket socket)
{
if (ConnectedSockets.Remove(socket))
OnClientsChange?.Invoke(socket, ConnectedSockets.Count, true);
}
private void OnAccept(Socket socket)
{
ConnectedSockets.Add(socket);
OnClientsChange.Invoke(socket, ConnectedSockets.Count, false);
}
public void Listen(IListenerDispatcher requestDispatcher)
{
shutdown = false;
@ -95,6 +114,7 @@ namespace FastTunnel.Core
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
OnAccept(accept);
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{IP}:{Port}】Accepted. There are {{0}} clients connected to the port",
@ -104,7 +124,10 @@ namespace FastTunnel.Core
StartAccept(e);
// 将此客户端交由Dispatcher进行管理
_requestDispatcher.Dispatch(accept);
_requestDispatcher.Dispatch(accept, this.OnOffLine);
// Only the sockets that contain a connection request
// will remain in listenList after Select returns.
}
else
{

View File

@ -35,11 +35,11 @@ namespace FastTunnel.Core
var rcv1 = new DataReciver(m_sockt1);
rcv1.OnComplete += Rcv1_OnComplete;
rcv1.ReciveOne();
rcv1.ReciveOneAsync();
var rcv2 = new DataReciver(m_sockt2);
rcv2.OnComplete += Rcv2_OnComplete;
rcv2.ReciveOne();
rcv2.ReciveOneAsync();
}
public void StartSwapAsync()
@ -61,13 +61,13 @@ namespace FastTunnel.Core
private void Rcv1_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
m_sockt2.Send(buffer, index, count, SocketFlags.None);
send.ReciveOne();
send.ReciveOneAsync();
}
private void Rcv2_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
m_sockt1.Send(buffer, index, count, SocketFlags.None);
send.ReciveOne();
send.ReciveOneAsync();
}
}
}

View File

@ -41,16 +41,30 @@ namespace FastTunnel.Core.Core
}
}
IListener client_listener;
IListener http_listener;
private void ListenClient()
{
IListener client_listener = new AsyncListener(ServerSettings.BindAddr, ServerSettings.BindPort, _logger);
client_listener = new AsyncListener(ServerSettings.BindAddr, ServerSettings.BindPort, _logger);
client_listener.OnClientsChange += Client_listener_OnClientsChange;
client_listener.Listen(new ClientDispatcher(this, _logger, ServerSettings));
_logger.LogDebug($"监听客户端 -> {ServerSettings.BindAddr}:{ServerSettings.BindPort}");
}
private void Client_listener_OnClientsChange(System.Net.Sockets.Socket socket, int count, bool is_oofline)
{
if (is_oofline)
_logger.LogInformation($"客户端 {socket.RemoteEndPoint} 已断开,当前连接数:{count}");
else
_logger.LogInformation($"客户端 {socket.RemoteEndPoint} 已连接,当前连接数:{count}");
}
private void ListenHttp()
{
IListener http_listener = new AsyncListener(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
http_listener = new AsyncListener(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
http_listener.Listen(new HttpDispatcher(this, _logger, ServerSettings));
_logger.LogDebug($"监听HTTP -> {ServerSettings.BindAddr}:{ServerSettings.WebProxyPort}");

View File

@ -7,6 +7,7 @@ namespace FastTunnel.Core
{
public delegate void OnCompleteHandler(DataReciver send, byte[] buffer, int index, int count);
public delegate void OnError(DataReciver send, SocketAsyncEventArgs e);
public delegate void OnConnectionReset(DataReciver send, Socket socket, SocketAsyncEventArgs e);
public class DataReciver
{
@ -14,6 +15,7 @@ namespace FastTunnel.Core
public event OnCompleteHandler OnComplete;
public event OnError OnError;
public event OnConnectionReset OnReset;
byte[] buffer = new byte[1024 * 1024];
SocketAsyncEventArgs rcv_event;
@ -29,7 +31,7 @@ namespace FastTunnel.Core
rcv_event.SetBuffer(buffer);
}
public void ReciveOne()
public void ReciveOneAsync()
{
var willRaise = m_client.ReceiveAsync(rcv_event);
if (!willRaise)
@ -56,6 +58,11 @@ namespace FastTunnel.Core
OnComplete?.Invoke(this, buffer, e.Offset, e.BytesTransferred);
}
}
else if (e.SocketError == SocketError.ConnectionReset)
{
// 断线
OnReset?.Invoke(this, m_client, e);
}
else
{
OnError?.Invoke(this, e);

View File

@ -20,6 +20,7 @@ namespace FastTunnel.Core.Handlers.Server
readonly LoginMessageHandler _loginHandler;
readonly HeartMessageHandler _heartHandler;
readonly SwapMessageHandler _swapMsgHandler;
Action<Socket> offLineAction;
public ClientDispatcher(FastTunnelServer fastTunnelServer, ILogger logger, IServerConfig serverSettings)
{
@ -39,13 +40,18 @@ namespace FastTunnel.Core.Handlers.Server
var reader = new DataReciver(client);
reader.OnComplete += Reader_OnComplete;
reader.OnError += Reader_OnError;
reader.OnReset += Reader_OnReset;
reader.ReciveOneAsync();
}
reader.ReciveOne();
private void Reader_OnReset(DataReciver send, Socket socket, SocketAsyncEventArgs e)
{
offLineAction(socket);
}
private void Reader_OnError(DataReciver send, SocketAsyncEventArgs e)
{
//
_logger.LogError("接收客户端数据异常 {0}", e.SocketError);
}
private void Reader_OnComplete(DataReciver reader, byte[] buffer, int offset, int count)
@ -67,7 +73,7 @@ namespace FastTunnel.Core.Handlers.Server
if (firstIndex < 0)
{
temp += words;
reader.ReciveOne();
reader.ReciveOneAsync();
break;
}
@ -84,7 +90,7 @@ namespace FastTunnel.Core.Handlers.Server
if (needRecive)
{
reader.ReciveOne();
reader.ReciveOneAsync();
}
}
catch (Exception ex)
@ -94,7 +100,7 @@ namespace FastTunnel.Core.Handlers.Server
// throw;
reader.Socket.Send(new Message<LogMassage>() { MessageType = MessageType.Log, Content = new LogMassage(LogMsgType.Error, ex.Message) });
reader.ReciveOne();
reader.ReciveOneAsync();
}
}
@ -122,5 +128,11 @@ namespace FastTunnel.Core.Handlers.Server
handler.HandlerMsg(this._fastTunnelServer, client, msg);
return handler;
}
public void Dispatch(Socket httpClient, Action<Socket> onOffLine)
{
offLineAction = onOffLine;
Dispatch(httpClient);
}
}
}

View File

@ -190,5 +190,10 @@ namespace FastTunnel.Core.Handlers.Server
client.Send(responseBody);
client.Close();
}
public void Dispatch(Socket httpClient, Action<Socket> onOffLine)
{
Dispatch(httpClient);
}
}
}

View File

@ -7,6 +7,8 @@ namespace FastTunnel.Core.Handlers.Server
{
public interface IListenerDispatcher
{
void Dispatch(Socket httpClient, Action<Socket> onOffLine);
void Dispatch(Socket httpClient);
}
}

View File

@ -30,5 +30,10 @@ namespace FastTunnel.Core.Handlers.Server
CustomerClient = _socket,
});
}
public void Dispatch(Socket httpClient, Action<Socket> onOffLine)
{
Dispatch(httpClient);
}
}
}

View File

@ -15,5 +15,7 @@ namespace FastTunnel.Core
void Listen(IListenerDispatcher requestDispatcher);
void ShutdownAndClose();
event OnClientChangeLine OnClientsChange;
}
}