diff --git a/FastTunnel.Core/Client/FastTunnelClient.cs b/FastTunnel.Core/Client/FastTunnelClient.cs index e60b77e..bdb01ae 100644 --- a/FastTunnel.Core/Client/FastTunnelClient.cs +++ b/FastTunnel.Core/Client/FastTunnelClient.cs @@ -122,8 +122,6 @@ public class FastTunnelClient : IFastTunnelClient private async void HandleServerRequestAsync(byte cmd, string ctx, CancellationToken cancellationToken) { - await Task.Yield(); - try { IClientHandler handler; diff --git a/FastTunnel.Core/Forwarder/FastTunelProtocol.cs b/FastTunnel.Core/Forwarder/FastTunelProtocol.cs index 6e84cc0..d666598 100644 --- a/FastTunnel.Core/Forwarder/FastTunelProtocol.cs +++ b/FastTunnel.Core/Forwarder/FastTunelProtocol.cs @@ -37,27 +37,36 @@ public class FastTunelProtocol IDuplexPipe Transport => context.Transport; + const char CharR = '\r'; + + private const byte ByteCR = (byte)'\r'; + private const byte ByteLF = (byte)'\n'; + private const byte ByteColon = (byte)':'; + private const byte ByteSpace = (byte)' '; + internal async Task TryAnalysisPipeAsync() { var _input = Transport.Input; ReadResult result; ReadOnlySequence readableBuffer; + while (true) { result = await _input.ReadAsync(context.ConnectionClosed); - var tempBuffer = readableBuffer = result.Buffer; + readableBuffer = result.Buffer; + SequencePosition start = readableBuffer.Start; SequencePosition? position = null; do { - position = tempBuffer.PositionOf((byte)'\n'); + position = readableBuffer.PositionOf(ByteLF); if (position != null) { var readedPosition = readableBuffer.GetPosition(1, position.Value); - if (ProcessLine(tempBuffer.Slice(0, position.Value), out var line)) + if (ProcessHeaderLine(readableBuffer.Slice(0, position.Value), out var _)) { if (Method == ProtocolConst.HTTP_METHOD_SWAP) { @@ -65,7 +74,7 @@ public class FastTunelProtocol } else { - _input.AdvanceTo(readableBuffer.Start, readableBuffer.Start); + _input.AdvanceTo(start, start); } if (IsFastTunnel) @@ -73,7 +82,6 @@ public class FastTunelProtocol context.Features.Set(new FastTunnelFeature() { MatchWeb = MatchWeb, - HasReadLInes = HasReadLInes, Method = Method, Host = Host, MessageId = MessageId, @@ -82,7 +90,7 @@ public class FastTunelProtocol return; } - tempBuffer = tempBuffer.Slice(readedPosition); + readableBuffer = readableBuffer.Slice(readedPosition); } } while (position != null && !readableBuffer.IsEmpty); @@ -100,7 +108,6 @@ public class FastTunelProtocol public string MessageId; private bool isFirstLine = true; - public IList HasReadLInes { get; private set; } = new List(); public FastTunnelServer fastTunnelServer { get; } /// @@ -115,23 +122,22 @@ public class FastTunelProtocol /// Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 /// /// - /// + /// /// Header读取完毕? - private bool ProcessLine(ReadOnlySequence readOnlySequence, out string lineStr) + private bool ProcessHeaderLine(ReadOnlySequence headerLine, out string headerLineStr) { - lineStr = Encoding.UTF8.GetString(readOnlySequence); - HasReadLInes.Add(lineStr); + headerLineStr = Encoding.UTF8.GetString(headerLine); if (isFirstLine) { - Method = lineStr.Substring(0, lineStr.IndexOf(" ")).ToUpper(); + Method = headerLineStr.Substring(0, headerLineStr.IndexOf(" ")).ToUpper(); switch (Method) { case ProtocolConst.HTTP_METHOD_SWAP: // 客户端发起消息互转 - var endIndex = lineStr.IndexOf(" ", 7); - MessageId = lineStr.Substring(7, endIndex - 7); + var endIndex = headerLineStr.IndexOf(" ", 7); + MessageId = headerLineStr.Substring(7, endIndex - 7); break; default: // 常规Http请求,需要检查Host决定是否进行代理 @@ -142,21 +148,9 @@ public class FastTunelProtocol } else { - if (lineStr.Equals("\r")) + // TrailerHeader + if (headerLineStr.Equals("\r")) { - if (Method == ProtocolConst.HTTP_METHOD_SWAP) - { - // do nothing - } - else - { - // 匹配Host, - if (fastTunnelServer.TryGetWebProxyByHost(Host, out var web)) - { - MatchWeb = web; - } - } - return true; } @@ -168,15 +162,22 @@ public class FastTunelProtocol default: // 检查Host决定是否进行代理 // Host: test.test.cc:1270 - var lower = lineStr.Trim('\r').ToLower(); - if (lower.StartsWith("host:")) + var lower = headerLineStr.Trim(CharR).ToLower(); + if (lower.StartsWith("host:", StringComparison.OrdinalIgnoreCase)) { Host = lower.Split(" ")[1]; if (Host.Contains(":")) { Host = Host.Split(":")[0]; } + + // 匹配Host, + if (fastTunnelServer.TryGetWebProxyByHost(Host, out var web)) + { + MatchWeb = web; + } } + break; } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs b/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs index e9f2ddc..d38cdfe 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs @@ -13,7 +13,7 @@ using FastTunnel.Core.Models; namespace FastTunnel.Core.Forwarder.Kestrel.Features; -public class FastTunnelFeature : IFastTunnelFeature +public struct FastTunnelFeature : IFastTunnelFeature { public WebInfo MatchWeb { get; set; } diff --git a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs index 811ca39..a13e548 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs @@ -69,6 +69,9 @@ internal class ForwarderMiddleware } } + public int UserCount = 0; + public int ClientCount = 0; + /// /// 用户向服务端发起的请求 /// @@ -79,6 +82,8 @@ internal class ForwarderMiddleware var feat = context.Features.Get(); var requestId = Guid.NewGuid().ToString().Replace("-", ""); + Interlocked.Increment(ref UserCount); + logger.LogDebug($"=========USER START {requestId}==========="); var web = feat.MatchWeb; @@ -90,12 +95,12 @@ internal class ForwarderMiddleware return; } - await Task.Yield(); - (Stream Stream, CancellationTokenSource TokenSource) res = (null, null); try { + var ss = context.LocalEndPoint; + try { // 发送指令给客户端,等待建立隧道 @@ -112,7 +117,6 @@ internal class ForwarderMiddleware res = await tcs.Task; var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(res.TokenSource.Token, context.ConnectionClosed); - using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); //var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed); @@ -127,6 +131,7 @@ internal class ForwarderMiddleware } finally { + Interlocked.Decrement(ref UserCount); logger.LogDebug($"=========USER END {requestId}==========="); fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); @@ -145,6 +150,7 @@ internal class ForwarderMiddleware /// private async Task doSwap(ConnectionContext context) { + Interlocked.Increment(ref ClientCount); var feat = context.Features.Get(); var requestId = feat.MessageId; @@ -176,6 +182,7 @@ internal class ForwarderMiddleware } finally { + Interlocked.Decrement(ref ClientCount); logger.LogDebug($"=========CLINET END {requestId}==========="); await context.Transport.Input.CompleteAsync(); await context.Transport.Output.CompleteAsync(); diff --git a/FastTunnel.Core/Handlers/Client/SwapHandler.cs b/FastTunnel.Core/Handlers/Client/SwapHandler.cs index 26b4a4c..283b5f1 100644 --- a/FastTunnel.Core/Handlers/Client/SwapHandler.cs +++ b/FastTunnel.Core/Handlers/Client/SwapHandler.cs @@ -26,25 +26,28 @@ public class SwapHandler : IClientHandler _logger = logger; } + public int SwapCount = 0; + public async Task HandlerMsgAsync(FastTunnelClient cleint, string msg, CancellationToken cancellationToken) { - var msgs = msg.Split('|'); - var requestId = msgs[0]; - var address = msgs[1]; - _logger.LogDebug($"========Swap Start:{requestId}=========="); - + string requestId = null; try { - using (var serverStream = await createRemote(requestId, cleint, cancellationToken)) - using (var localStream = await createLocal(requestId, address, cancellationToken)) - { - var taskX = serverStream.CopyToAsync(localStream, cancellationToken); - var taskY = localStream.CopyToAsync(serverStream, cancellationToken); + Interlocked.Increment(ref SwapCount); + var msgs = msg.Split('|'); + requestId = msgs[0]; + var address = msgs[1]; + _logger.LogDebug($"========Swap Start:{requestId}=========="); - await Task.WhenAny(taskX, taskY); + using var serverStream = await createRemote(requestId, cleint, cancellationToken); + using var localStream = await createLocal(requestId, address, cancellationToken); - _logger.LogDebug($"[HandlerMsgAsync] success {requestId}"); - } + var taskX = serverStream.CopyToAsync(localStream, cancellationToken); + var taskY = localStream.CopyToAsync(serverStream, cancellationToken); + + await Task.WhenAny(taskX, taskY).WaitAsync(cancellationToken); + + _logger.LogDebug($"[HandlerMsgAsync] success {requestId}"); } catch (Exception ex) { @@ -52,6 +55,7 @@ public class SwapHandler : IClientHandler } finally { + Interlocked.Decrement(ref SwapCount); _logger.LogDebug($"========Swap End:{requestId}=========="); } } diff --git a/FastTunnel.Core/Listener/PortProxyListener.cs b/FastTunnel.Core/Listener/PortProxyListener.cs index 40e2509..eabb7cd 100644 --- a/FastTunnel.Core/Listener/PortProxyListener.cs +++ b/FastTunnel.Core/Listener/PortProxyListener.cs @@ -81,7 +81,7 @@ public class PortProxyListener Interlocked.Increment(ref m_numConnectedSockets); - _logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port", + _logerr.LogDebug($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port", m_numConnectedSockets); // 将此客户端交由Dispatcher进行管理