diff --git a/FastTunnel.Core/Dispatchers/HttpDispatcherV2.cs b/FastTunnel.Core/Dispatchers/HttpDispatcherV2.cs index 5fc22ec..b1adb37 100644 --- a/FastTunnel.Core/Dispatchers/HttpDispatcherV2.cs +++ b/FastTunnel.Core/Dispatchers/HttpDispatcherV2.cs @@ -73,7 +73,6 @@ namespace FastTunnel.Core.Dispatchers Host = collection[0].Value; } - _logger.LogDebug(Host.Replace("\r", "")); var domain = Host.Split(":")[1].Trim(); _logger.LogDebug($"=======Dispatch domain:{domain} {token.RequestId} ========"); @@ -94,34 +93,26 @@ namespace FastTunnel.Core.Dispatchers return; } - _logger.LogDebug($"=======找到映射的站点 {token.RequestId}========"); - _fastTunnelServer.RequestTemp.TryAdd(token.RequestId, new NewRequest - { - CustomerClient = token.Socket, - Buffer = token.Recived - }); - try { - sw.Stop(); - _logger.LogDebug($"[寻找路由耗时]:{sw.ElapsedMilliseconds}ms"); - - sw.Restart(); + // 发送指令给客户端,等待建立隧道 web.Socket.SendCmd(new Message { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = token.RequestId, WebConfig = web.WebConfig } }); - - sw.Stop(); - _logger.LogDebug($"[发送NewCustomer指令耗时]:{sw.ElapsedMilliseconds}"); - - _logger.LogDebug($"=======发送请求成功 {token.RequestId}========"); } catch (Exception) { - _logger.LogDebug($"=======客户端不在线 {token.RequestId}========"); + _logger.LogDebug($"[客户端不在线] {token.RequestId}"); HandlerClientNotOnLine(token.Socket, domain); // 移除 _fastTunnelServer.WebList.TryRemove(domain, out _); } + + // 将等待的http请求 + _fastTunnelServer.RequestTemp.TryAdd(token.RequestId, new NewRequest + { + CustomerClient = token.Socket, + Buffer = token.Recived + }); } public void Dispatch(Socket httpClient) diff --git a/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs b/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs index 56b99c3..adac8c3 100644 --- a/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs +++ b/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs @@ -29,11 +29,11 @@ namespace FastTunnel.Core.Handlers.Client var request = Msg.Content.ToObject(); var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(request.MsgId.Split('_')[0]); - _logger.LogDebug($"Start SwapMassage {request.MsgId} 延迟时间:{interval}ms"); + _logger.LogDebug($"Start SwapMassage {request.MsgId} 服务端耗时:{interval}ms"); var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort); - connecter.Connect(); + connecter.Send(new Message { MessageType = MessageType.C_SwapMsg, Content = new SwapMassage(request.MsgId) }); _logger.LogDebug($"连接server成功 {request.MsgId}"); diff --git a/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs b/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs index 5c14958..28ecce5 100644 --- a/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs +++ b/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs @@ -2,6 +2,7 @@ using FastTunnel.Core.Extensions; using FastTunnel.Core.Models; using FastTunnel.Core.Sockets; +using FastTunnel.Core.Utility.Extensions; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using System; @@ -27,8 +28,9 @@ namespace FastTunnel.Core.Handlers.Server { var SwapMsg = msg.Content.ToObject(); NewRequest request; + var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(SwapMsg.msgId.Split('_')[0]); - _logger.LogDebug($"响应NewCustomer:{SwapMsg.msgId}"); + _logger.LogDebug($"[开始转发HTTP]:{SwapMsg.msgId} 客户端耗时:{interval}ms"); if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request)) { server.RequestTemp.TryRemove(SwapMsg.msgId, out _); diff --git a/FastTunnel.Core/Listener/ClientListenerV2.cs b/FastTunnel.Core/Listener/ClientListenerV2.cs index 7914f98..b961122 100644 --- a/FastTunnel.Core/Listener/ClientListenerV2.cs +++ b/FastTunnel.Core/Listener/ClientListenerV2.cs @@ -42,7 +42,7 @@ namespace FastTunnel.Core.Listener _heartHandler = new HeartMessageHandler(); _swapMsgHandler = new SwapMessageHandler(_logger); - server = new Server.Server(2000, 100, _logger); + server = new Server.Server(2000, 100, false, _logger); } public void Start() @@ -86,7 +86,15 @@ namespace FastTunnel.Core.Listener throw new Exception($"未知的通讯指令 {msg.MessageType}"); } - handler.HandlerMsg(this._fastTunnelServer, token.Socket, msg); + try + { + handler.HandlerMsg(this._fastTunnelServer, token.Socket, msg); + } + catch (Exception ex) + { + _logger.LogError(ex, "处理客户端消息失败"); + } + return handler.NeedRecive; } diff --git a/FastTunnel.Core/Listener/HttpListenerV2.cs b/FastTunnel.Core/Listener/HttpListenerV2.cs index 1f72e40..7d60507 100644 --- a/FastTunnel.Core/Listener/HttpListenerV2.cs +++ b/FastTunnel.Core/Listener/HttpListenerV2.cs @@ -30,7 +30,7 @@ namespace FastTunnel.Core.Listener this.ListenIp = ip; this.ListenPort = port; - server = new Server.Server(500, 512, _logger); + server = new Server.Server(1000, 512, true, _logger); } public void Start(IListenerDispatcher requestDispatcher, int backlog = 100) diff --git a/FastTunnel.Core/Server/Server.cs b/FastTunnel.Core/Server/Server.cs index bb6add7..8bdf53b 100644 --- a/FastTunnel.Core/Server/Server.cs +++ b/FastTunnel.Core/Server/Server.cs @@ -31,6 +31,7 @@ namespace FastTunnel.Core.Server Func m_handller; string m_sectionFlag; IPEndPoint _localEndPoint; + bool m_isHttpServer; ILogger _logger; @@ -40,8 +41,9 @@ namespace FastTunnel.Core.Server // // the maximum number of connections the sample is designed to handle simultaneously // buffer size to use for each socket I/O operation - public Server(int numConnections, int receiveBufferSize, ILogger logger) + public Server(int numConnections, int receiveBufferSize, bool isHttpServer, ILogger logger) { + m_isHttpServer = isHttpServer; _logger = logger; //m_totalBytesRead = 0; m_numConnectedSockets = 0; @@ -146,15 +148,19 @@ namespace FastTunnel.Core.Server _logger.LogInformation($"[当前连接数]:{_localEndPoint.Port} | {m_numConnectedSockets}"); // Get the socket for the accepted client connection and put it into the - //ReadEventArg object user token + // ReadEventArg object user token SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop(); var token = readEventArgs.UserToken as AsyncUserToken; token.Socket = e.AcceptSocket; token.MassgeTemp = null; token.Recived = null; - token.RequestId = $"{DateTime.Now.GetChinaTicks()}_{Guid.NewGuid().ToString().Replace("-", string.Empty)}"; - _logger.LogDebug($"Accept {token.RequestId}"); + // 客户端请求不需要分配msgid + if (m_isHttpServer) + { + token.RequestId = $"{DateTime.Now.GetChinaTicks()}_{Guid.NewGuid().ToString().Replace("-", string.Empty)}"; + _logger.LogDebug($"Accept {token.RequestId}"); + } // As soon as the client is connected, post a receive to the connection bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs); @@ -196,25 +202,28 @@ namespace FastTunnel.Core.Server _logger.LogDebug($"[ProcessReceive]: {_localEndPoint.Port} | {token.RequestId}"); if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { - // increment the count of the total bytes receive by the server - // Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred); - if (token.Recived != null) - { - byte[] resArr = new byte[token.Recived.Length + e.BytesTransferred]; - token.Recived.CopyTo(resArr, 0); - Array.Copy(e.Buffer, e.Offset, resArr, token.Recived.Length, e.BytesTransferred); - token.Recived = resArr; - } - else - { - byte[] resArr = new byte[e.BytesTransferred]; - Array.Copy(e.Buffer, e.Offset, resArr, 0, e.BytesTransferred); - token.Recived = resArr; - } - bool needRecive = false; var words = e.Buffer.GetString(e.Offset, e.BytesTransferred); var sum = token.MassgeTemp + words; + + // 只有http请求需要对已发送字节进行存储 + if (m_isHttpServer) + { + if (token.Recived != null) + { + byte[] resArr = new byte[token.Recived.Length + e.BytesTransferred]; + token.Recived.CopyTo(resArr, 0); + Array.Copy(e.Buffer, e.Offset, resArr, token.Recived.Length, e.BytesTransferred); + token.Recived = resArr; + } + else + { + byte[] resArr = new byte[e.BytesTransferred]; + Array.Copy(e.Buffer, e.Offset, resArr, 0, e.BytesTransferred); + token.Recived = resArr; + } + } + if (sum.Contains(m_sectionFlag)) { var array = (sum).Split(m_sectionFlag); @@ -246,7 +255,6 @@ namespace FastTunnel.Core.Server token.MassgeTemp = sum; } - e.SetBuffer(e.Offset, m_receiveBufferSize); bool willRaiseEvent = token.Socket.ReceiveAsync(e); if (!willRaiseEvent) { diff --git a/FastTunnel.Core/Sockets/DnsSocket.cs b/FastTunnel.Core/Sockets/DnsSocket.cs index 9567248..c97729b 100644 --- a/FastTunnel.Core/Sockets/DnsSocket.cs +++ b/FastTunnel.Core/Sockets/DnsSocket.cs @@ -21,6 +21,7 @@ namespace FastTunnel.Core.Sockets this._port = v2; Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + Socket.NoDelay = true; } public void Connect() diff --git a/FastTunnel.Core/Sockets/SocketSwap.cs b/FastTunnel.Core/Sockets/SocketSwap.cs index 48ccf87..5a335ee 100644 --- a/FastTunnel.Core/Sockets/SocketSwap.cs +++ b/FastTunnel.Core/Sockets/SocketSwap.cs @@ -17,7 +17,7 @@ namespace FastTunnel.Core.Sockets private readonly string m_msgId = null; private readonly ILogger m_logger; - private bool Swaped = false; + private bool swapeStarted = false; private class Channel { @@ -28,6 +28,8 @@ namespace FastTunnel.Core.Sockets public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId) { + //sockt1.NoDelay = true; + //sockt2.NoDelay = true; m_sockt1 = sockt1; m_sockt2 = sockt2; m_msgId = msgId; @@ -36,9 +38,9 @@ namespace FastTunnel.Core.Sockets public void StartSwap() { - m_logger?.LogDebug($"StartSwap {m_msgId}"); + m_logger?.LogDebug($"[StartSwapStart] {m_msgId}"); + swapeStarted = true; - Swaped = true; ThreadPool.QueueUserWorkItem(swapCallback, new Channel { Send = m_sockt1, @@ -50,6 +52,8 @@ namespace FastTunnel.Core.Sockets Send = m_sockt2, Receive = m_sockt1 }); + + m_logger?.LogDebug($"[StartSwapEnd] {m_msgId}"); } private void swapCallback(object state) @@ -66,7 +70,7 @@ namespace FastTunnel.Core.Sockets { try { - num = chanel.Receive.Receive(result, result.Length, SocketFlags.None); + num = chanel.Receive.Receive(result, 0, result.Length, SocketFlags.None); } catch (Exception) { @@ -82,7 +86,7 @@ namespace FastTunnel.Core.Sockets try { - chanel.Send.Send(result, num, SocketFlags.None); + chanel.Send.Send(result, 0, num, SocketFlags.None); } catch (Exception) { @@ -136,7 +140,7 @@ namespace FastTunnel.Core.Sockets { m_logger?.LogDebug($"BeforeSwap {m_msgId}"); - if (Swaped) + if (swapeStarted) { throw new Exception("BeforeSwap must be invoked before StartSwap!"); }