优化部分逻辑

This commit is contained in:
SpringHgui 2021-07-11 00:18:30 +08:00
parent 00a38a190b
commit b5ae1f8dc6
8 changed files with 65 additions and 51 deletions

View File

@ -73,7 +73,6 @@ namespace FastTunnel.Core.Dispatchers
Host = collection[0].Value; Host = collection[0].Value;
} }
_logger.LogDebug(Host.Replace("\r", ""));
var domain = Host.Split(":")[1].Trim(); var domain = Host.Split(":")[1].Trim();
_logger.LogDebug($"=======Dispatch domain:{domain} {token.RequestId} ========"); _logger.LogDebug($"=======Dispatch domain:{domain} {token.RequestId} ========");
@ -94,34 +93,26 @@ namespace FastTunnel.Core.Dispatchers
return; return;
} }
_logger.LogDebug($"=======找到映射的站点 {token.RequestId}========");
_fastTunnelServer.RequestTemp.TryAdd(token.RequestId, new NewRequest
{
CustomerClient = token.Socket,
Buffer = token.Recived
});
try try
{ {
sw.Stop(); // 发送指令给客户端,等待建立隧道
_logger.LogDebug($"[寻找路由耗时]{sw.ElapsedMilliseconds}ms");
sw.Restart();
web.Socket.SendCmd(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = token.RequestId, WebConfig = web.WebConfig } }); web.Socket.SendCmd(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = token.RequestId, WebConfig = web.WebConfig } });
sw.Stop();
_logger.LogDebug($"[发送NewCustomer指令耗时]{sw.ElapsedMilliseconds}");
_logger.LogDebug($"=======发送请求成功 {token.RequestId}========");
} }
catch (Exception) catch (Exception)
{ {
_logger.LogDebug($"=======客户端不在线 {token.RequestId}========"); _logger.LogDebug($"[客户端不在线] {token.RequestId}");
HandlerClientNotOnLine(token.Socket, domain); HandlerClientNotOnLine(token.Socket, domain);
// 移除 // 移除
_fastTunnelServer.WebList.TryRemove(domain, out _); _fastTunnelServer.WebList.TryRemove(domain, out _);
} }
// 将等待的http请求
_fastTunnelServer.RequestTemp.TryAdd(token.RequestId, new NewRequest
{
CustomerClient = token.Socket,
Buffer = token.Recived
});
} }
public void Dispatch(Socket httpClient) public void Dispatch(Socket httpClient)

View File

@ -29,11 +29,11 @@ namespace FastTunnel.Core.Handlers.Client
var request = Msg.Content.ToObject<NewCustomerMassage>(); var request = Msg.Content.ToObject<NewCustomerMassage>();
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(request.MsgId.Split('_')[0]); var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(request.MsgId.Split('_')[0]);
_logger.LogDebug($"Start SwapMassage {request.MsgId} 延迟时间{interval}ms"); _logger.LogDebug($"Start SwapMassage {request.MsgId} 服务端耗时{interval}ms");
var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort); var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort);
connecter.Connect(); connecter.Connect();
connecter.Send(new Message<SwapMassage> { MessageType = MessageType.C_SwapMsg, Content = new SwapMassage(request.MsgId) }); connecter.Send(new Message<SwapMassage> { MessageType = MessageType.C_SwapMsg, Content = new SwapMassage(request.MsgId) });
_logger.LogDebug($"连接server成功 {request.MsgId}"); _logger.LogDebug($"连接server成功 {request.MsgId}");

View File

@ -2,6 +2,7 @@
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Sockets; using FastTunnel.Core.Sockets;
using FastTunnel.Core.Utility.Extensions;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using System; using System;
@ -27,8 +28,9 @@ namespace FastTunnel.Core.Handlers.Server
{ {
var SwapMsg = msg.Content.ToObject<SwapMassage>(); var SwapMsg = msg.Content.ToObject<SwapMassage>();
NewRequest request; NewRequest request;
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(SwapMsg.msgId.Split('_')[0]);
_logger.LogDebug($"响应NewCustomer{SwapMsg.msgId}"); _logger.LogDebug($"[开始转发HTTP]{SwapMsg.msgId} 客户端耗时:{interval}ms");
if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request)) if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request))
{ {
server.RequestTemp.TryRemove(SwapMsg.msgId, out _); server.RequestTemp.TryRemove(SwapMsg.msgId, out _);

View File

@ -42,7 +42,7 @@ namespace FastTunnel.Core.Listener
_heartHandler = new HeartMessageHandler(); _heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger); _swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(2000, 100, _logger); server = new Server.Server(2000, 100, false, _logger);
} }
public void Start() public void Start()
@ -86,7 +86,15 @@ namespace FastTunnel.Core.Listener
throw new Exception($"未知的通讯指令 {msg.MessageType}"); throw new Exception($"未知的通讯指令 {msg.MessageType}");
} }
handler.HandlerMsg(this._fastTunnelServer, token.Socket, msg); try
{
handler.HandlerMsg(this._fastTunnelServer, token.Socket, msg);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理客户端消息失败");
}
return handler.NeedRecive; return handler.NeedRecive;
} }

View File

@ -30,7 +30,7 @@ namespace FastTunnel.Core.Listener
this.ListenIp = ip; this.ListenIp = ip;
this.ListenPort = port; this.ListenPort = port;
server = new Server.Server(500, 512, _logger); server = new Server.Server(1000, 512, true, _logger);
} }
public void Start(IListenerDispatcher requestDispatcher, int backlog = 100) public void Start(IListenerDispatcher requestDispatcher, int backlog = 100)

View File

@ -31,6 +31,7 @@ namespace FastTunnel.Core.Server
Func<AsyncUserToken, string, bool> m_handller; Func<AsyncUserToken, string, bool> m_handller;
string m_sectionFlag; string m_sectionFlag;
IPEndPoint _localEndPoint; IPEndPoint _localEndPoint;
bool m_isHttpServer;
ILogger _logger; ILogger _logger;
@ -40,8 +41,9 @@ namespace FastTunnel.Core.Server
// //
// <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param> // <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
// <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param> // <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
public Server(int numConnections, int receiveBufferSize, ILogger logger) public Server(int numConnections, int receiveBufferSize, bool isHttpServer, ILogger logger)
{ {
m_isHttpServer = isHttpServer;
_logger = logger; _logger = logger;
//m_totalBytesRead = 0; //m_totalBytesRead = 0;
m_numConnectedSockets = 0; m_numConnectedSockets = 0;
@ -146,15 +148,19 @@ namespace FastTunnel.Core.Server
_logger.LogInformation($"[当前连接数]:{_localEndPoint.Port} | {m_numConnectedSockets}"); _logger.LogInformation($"[当前连接数]:{_localEndPoint.Port} | {m_numConnectedSockets}");
// Get the socket for the accepted client connection and put it into the // Get the socket for the accepted client connection and put it into the
//ReadEventArg object user token // ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop(); SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
var token = readEventArgs.UserToken as AsyncUserToken; var token = readEventArgs.UserToken as AsyncUserToken;
token.Socket = e.AcceptSocket; token.Socket = e.AcceptSocket;
token.MassgeTemp = null; token.MassgeTemp = null;
token.Recived = null; token.Recived = null;
token.RequestId = $"{DateTime.Now.GetChinaTicks()}_{Guid.NewGuid().ToString().Replace("-", string.Empty)}";
_logger.LogDebug($"Accept {token.RequestId}"); // 客户端请求不需要分配msgid
if (m_isHttpServer)
{
token.RequestId = $"{DateTime.Now.GetChinaTicks()}_{Guid.NewGuid().ToString().Replace("-", string.Empty)}";
_logger.LogDebug($"Accept {token.RequestId}");
}
// As soon as the client is connected, post a receive to the connection // As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs); bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
@ -196,25 +202,28 @@ namespace FastTunnel.Core.Server
_logger.LogDebug($"[ProcessReceive]: {_localEndPoint.Port} | {token.RequestId}"); _logger.LogDebug($"[ProcessReceive]: {_localEndPoint.Port} | {token.RequestId}");
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{ {
// increment the count of the total bytes receive by the server
// Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
if (token.Recived != null)
{
byte[] resArr = new byte[token.Recived.Length + e.BytesTransferred];
token.Recived.CopyTo(resArr, 0);
Array.Copy(e.Buffer, e.Offset, resArr, token.Recived.Length, e.BytesTransferred);
token.Recived = resArr;
}
else
{
byte[] resArr = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, resArr, 0, e.BytesTransferred);
token.Recived = resArr;
}
bool needRecive = false; bool needRecive = false;
var words = e.Buffer.GetString(e.Offset, e.BytesTransferred); var words = e.Buffer.GetString(e.Offset, e.BytesTransferred);
var sum = token.MassgeTemp + words; var sum = token.MassgeTemp + words;
// 只有http请求需要对已发送字节进行存储
if (m_isHttpServer)
{
if (token.Recived != null)
{
byte[] resArr = new byte[token.Recived.Length + e.BytesTransferred];
token.Recived.CopyTo(resArr, 0);
Array.Copy(e.Buffer, e.Offset, resArr, token.Recived.Length, e.BytesTransferred);
token.Recived = resArr;
}
else
{
byte[] resArr = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, resArr, 0, e.BytesTransferred);
token.Recived = resArr;
}
}
if (sum.Contains(m_sectionFlag)) if (sum.Contains(m_sectionFlag))
{ {
var array = (sum).Split(m_sectionFlag); var array = (sum).Split(m_sectionFlag);
@ -246,7 +255,6 @@ namespace FastTunnel.Core.Server
token.MassgeTemp = sum; token.MassgeTemp = sum;
} }
e.SetBuffer(e.Offset, m_receiveBufferSize);
bool willRaiseEvent = token.Socket.ReceiveAsync(e); bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent) if (!willRaiseEvent)
{ {

View File

@ -21,6 +21,7 @@ namespace FastTunnel.Core.Sockets
this._port = v2; this._port = v2;
Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
Socket.NoDelay = true;
} }
public void Connect() public void Connect()

View File

@ -17,7 +17,7 @@ namespace FastTunnel.Core.Sockets
private readonly string m_msgId = null; private readonly string m_msgId = null;
private readonly ILogger m_logger; private readonly ILogger m_logger;
private bool Swaped = false; private bool swapeStarted = false;
private class Channel private class Channel
{ {
@ -28,6 +28,8 @@ namespace FastTunnel.Core.Sockets
public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId) public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId)
{ {
//sockt1.NoDelay = true;
//sockt2.NoDelay = true;
m_sockt1 = sockt1; m_sockt1 = sockt1;
m_sockt2 = sockt2; m_sockt2 = sockt2;
m_msgId = msgId; m_msgId = msgId;
@ -36,9 +38,9 @@ namespace FastTunnel.Core.Sockets
public void StartSwap() public void StartSwap()
{ {
m_logger?.LogDebug($"StartSwap {m_msgId}"); m_logger?.LogDebug($"[StartSwapStart] {m_msgId}");
swapeStarted = true;
Swaped = true;
ThreadPool.QueueUserWorkItem(swapCallback, new Channel ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{ {
Send = m_sockt1, Send = m_sockt1,
@ -50,6 +52,8 @@ namespace FastTunnel.Core.Sockets
Send = m_sockt2, Send = m_sockt2,
Receive = m_sockt1 Receive = m_sockt1
}); });
m_logger?.LogDebug($"[StartSwapEnd] {m_msgId}");
} }
private void swapCallback(object state) private void swapCallback(object state)
@ -66,7 +70,7 @@ namespace FastTunnel.Core.Sockets
{ {
try try
{ {
num = chanel.Receive.Receive(result, result.Length, SocketFlags.None); num = chanel.Receive.Receive(result, 0, result.Length, SocketFlags.None);
} }
catch (Exception) catch (Exception)
{ {
@ -82,7 +86,7 @@ namespace FastTunnel.Core.Sockets
try try
{ {
chanel.Send.Send(result, num, SocketFlags.None); chanel.Send.Send(result, 0, num, SocketFlags.None);
} }
catch (Exception) catch (Exception)
{ {
@ -136,7 +140,7 @@ namespace FastTunnel.Core.Sockets
{ {
m_logger?.LogDebug($"BeforeSwap {m_msgId}"); m_logger?.LogDebug($"BeforeSwap {m_msgId}");
if (Swaped) if (swapeStarted)
{ {
throw new Exception("BeforeSwap must be invoked before StartSwap!"); throw new Exception("BeforeSwap must be invoked before StartSwap!");
} }