From c3a8378f754faeccccb653afb444509f4300313a Mon Sep 17 00:00:00 2001 From: "Gui.H" Date: Sat, 2 Jul 2022 00:28:21 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FastTunnel.Client/Program.cs | 1 - .../Kestrel/MiddleWare/ForwarderMiddleware.cs | 72 ++++++++++++------- .../{Stream => Streams}/DuplexPipeStream.cs | 6 +- .../{Stream => Streams}/SocketDuplexPipe.cs | 2 +- .../Handlers/Client/SwapHandler.cs | 4 +- .../Handlers/Server/ForwardDispatcher.cs | 15 ++-- FastTunnel.Core/Server/FastTunnelServer.cs | 2 +- FastTunnel.Server/Program.cs | 1 - 8 files changed, 65 insertions(+), 38 deletions(-) rename FastTunnel.Core/Forwarder/{Stream => Streams}/DuplexPipeStream.cs (97%) rename FastTunnel.Core/Forwarder/{Stream => Streams}/SocketDuplexPipe.cs (95%) diff --git a/FastTunnel.Client/Program.cs b/FastTunnel.Client/Program.cs index 5560b5e..0a2de75 100644 --- a/FastTunnel.Client/Program.cs +++ b/FastTunnel.Client/Program.cs @@ -49,7 +49,6 @@ class Program Host.CreateDefaultBuilder(args) .UseWindowsService() .UseSerilog((context, services, configuration) => configuration - .MinimumLevel.Debug() .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.Console()) .ConfigureServices((hostContext, services) => diff --git a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs index 5758318..23c766d 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs @@ -10,11 +10,13 @@ using System.IO; using System.IO.Pipelines; using System.Net.WebSockets; using System.Text; +using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Exceptions; using FastTunnel.Core.Extensions; using FastTunnel.Core.Forwarder.Kestrel; using FastTunnel.Core.Forwarder.Kestrel.Features; +using FastTunnel.Core.Forwarder.Streams; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Protocol; using FastTunnel.Core.Server; @@ -43,19 +45,15 @@ internal class ForwarderMiddleware internal async Task OnConnectionAsync(ConnectionContext context) { - logger.LogInformation("=========ForwarderMiddleware SART==========="); - var feat = context.Features.Get(); if (feat == null) { - logger.LogInformation("=========ForwarderMiddleware END==========="); // not fasttunnel request await next(context); return; } else { - logger.LogInformation("=========Swap STRART==========="); if (feat.Method == ProtocolConst.HTTP_METHOD_SWAP) { await doSwap(context); @@ -68,25 +66,32 @@ internal class ForwarderMiddleware { throw new NotSupportedException(); } - - logger.LogInformation("=========Swap END==========="); - logger.LogInformation("=========ForwarderMiddleware END==========="); } } + /// + /// 用户发起的请求 + /// + /// + /// private async Task waitSwap(ConnectionContext context) { var feat = context.Features.Get(); var requestId = Guid.NewGuid().ToString().Replace("-", ""); + + logger.LogInformation($"=========USER START {requestId}==========="); var web = feat.MatchWeb; - TaskCompletionSource tcs = new(); + TaskCompletionSource<(Stream, CancellationTokenSource)> tcs = new(); logger.LogDebug($"[Http]Swap开始 {requestId}|{feat.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}"); - tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); }); - fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs); + if (!fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs)) + { + return; + } - IDuplexPipe res = null; + //IDuplexPipe res = null; + (Stream Stream, CancellationTokenSource TokenSource) res = (null, null); try { @@ -102,14 +107,19 @@ internal class ForwarderMiddleware // 通讯异常,返回客户端离线 throw new ClienOffLineException("客户端离线"); } - + res = await tcs.Task; - // using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(res.TokenSource.Token, context.ConnectionClosed); - var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed); - var t2 = context.Transport.Input.CopyToAsync(res.Output, context.ConnectionClosed); - await Task.WhenAny(t1, t2); + using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + + //var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed); + //var t2 = context.Transport.Input.CopyToAsync(res.Output, context.ConnectionClosed); + var t1 = res.Stream.CopyToAsync(reverseConnection, tokenSource.Token); + var t2 = reverseConnection.CopyToAsync(res.Stream, tokenSource.Token); + + await Task.WhenAny(t1, t2).WaitAsync(tokenSource.Token); } catch (Exception ex) { @@ -117,34 +127,49 @@ internal class ForwarderMiddleware } finally { - logger.LogInformation("[Http] waitSwap结束"); + logger.LogInformation($"=========USER END {requestId}==========="); fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); await context.Transport.Input.CompleteAsync(); await context.Transport.Output.CompleteAsync(); - await res.Input.CompleteAsync(); - await res.Output.CompleteAsync(); + res.TokenSource?.Cancel(); } } + /// + /// 内网发起的请求 + /// + /// + /// + /// private async Task doSwap(ConnectionContext context) { var feat = context.Features.Get(); var requestId = feat.MessageId; + + logger.LogInformation($"=========CLINET START {requestId}==========="); + if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream)) { throw new Exception($"[PROXY]:RequestId不存在 {requestId}"); }; - //using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); - responseStream.TrySetResult(context.Transport); + CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.ConnectionClosed); + + using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + responseStream.TrySetResult((reverseConnection, cancellationTokenSource)); var closedAwaiter = new TaskCompletionSource(); + cancellationTokenSource.Token.Register(() => + { + closedAwaiter.TrySetCanceled(); + }); + try { - closedAwaiter.Task.Wait(context.ConnectionClosed); + await closedAwaiter.Task; } catch (Exception ex) { @@ -152,10 +177,9 @@ internal class ForwarderMiddleware } finally { - logger.LogInformation($"=====================Swap End:{requestId}================== "); + logger.LogInformation($"=========CLINET END {requestId}==========="); await context.Transport.Input.CompleteAsync(); await context.Transport.Output.CompleteAsync(); - context.Abort(); } } } diff --git a/FastTunnel.Core/Forwarder/Stream/DuplexPipeStream.cs b/FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs similarity index 97% rename from FastTunnel.Core/Forwarder/Stream/DuplexPipeStream.cs rename to FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs index 9a313f3..825d535 100644 --- a/FastTunnel.Core/Forwarder/Stream/DuplexPipeStream.cs +++ b/FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs @@ -15,7 +15,7 @@ using System.Threading.Tasks; using FastTunnel.Core.Extensions; using FastTunnel.Core.Refs; -namespace FastTunnel.Core.Forwarder.Stream; +namespace FastTunnel.Core.Forwarder.Streams; internal class DuplexPipeStream : System.IO.Stream { @@ -143,7 +143,9 @@ internal class DuplexPipeStream : System.IO.Stream // buffer.Count is int var count = (int)Math.Min(readableBuffer.Length, destination.Length); readableBuffer = readableBuffer.Slice(0, count); - Console.WriteLine($"[{GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}"); + + //Console.WriteLine($"[{GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}"); + readableBuffer.CopyTo(destination.Span); return count; } diff --git a/FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs b/FastTunnel.Core/Forwarder/Streams/SocketDuplexPipe.cs similarity index 95% rename from FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs rename to FastTunnel.Core/Forwarder/Streams/SocketDuplexPipe.cs index af147ae..9e06c54 100644 --- a/FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs +++ b/FastTunnel.Core/Forwarder/Streams/SocketDuplexPipe.cs @@ -12,7 +12,7 @@ using System.Net.Sockets; using System.Text; using System.Threading.Tasks; -namespace FastTunnel.Core.Forwarder.Stream; +namespace FastTunnel.Core.Forwarder.Streams; internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable { diff --git a/FastTunnel.Core/Handlers/Client/SwapHandler.cs b/FastTunnel.Core/Handlers/Client/SwapHandler.cs index ef79b25..9aca19b 100644 --- a/FastTunnel.Core/Handlers/Client/SwapHandler.cs +++ b/FastTunnel.Core/Handlers/Client/SwapHandler.cs @@ -31,7 +31,7 @@ public class SwapHandler : IClientHandler var msgs = msg.Split('|'); var requestId = msgs[0]; var address = msgs[1]; - _logger.LogDebug($"[HandlerMsgAsync] start {requestId}"); + _logger.LogInformation($"========Swap Start:{requestId}=========="); try { @@ -52,7 +52,7 @@ public class SwapHandler : IClientHandler } finally { - _logger.LogDebug($"=====================Swap End:{requestId}================== "); + _logger.LogInformation($"========Swap End:{requestId}=========="); } } diff --git a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs index d9f82fc..5fa1392 100644 --- a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs +++ b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs @@ -13,7 +13,7 @@ using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Exceptions; using FastTunnel.Core.Extensions; -using FastTunnel.Core.Forwarder.Stream; +using FastTunnel.Core.Forwarder.Streams; using FastTunnel.Core.Models; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Server; @@ -44,12 +44,13 @@ public class ForwardDispatcher { var msgId = Guid.NewGuid().ToString().Replace("-", ""); + (Stream Stream, CancellationTokenSource TokenSource) res = default; + try { - await Task.Yield(); logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}"); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource<(Stream Stream, CancellationTokenSource TokenSource)>(); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); }); _server.ResponseTasks.TryAdd(msgId, tcs); @@ -75,10 +76,11 @@ public class ForwardDispatcher return; } - var stream1 = await tcs.Task; - await using var stream2 = new SocketDuplexPipe(_socket); + res = await tcs.Task; - await Task.WhenAny(stream1.Input.CopyToAsync(stream2.Output), stream2.Input.CopyToAsync(stream1.Output)); + //await using var stream2 = new SocketDuplexPipe(_socket); + using var stream2 = new NetworkStream(_socket); + await Task.WhenAny(res.Stream.CopyToAsync(stream2), stream2.CopyToAsync(res.Stream)); } catch (Exception ex) { @@ -86,6 +88,7 @@ public class ForwardDispatcher } finally { + res.TokenSource?.Cancel(); logger.LogDebug($"[Forward]Swap OK {msgId}"); _server.ResponseTasks.TryRemove(msgId, out _); } diff --git a/FastTunnel.Core/Server/FastTunnelServer.cs b/FastTunnel.Core/Server/FastTunnelServer.cs index 0a888c9..5a43743 100644 --- a/FastTunnel.Core/Server/FastTunnelServer.cs +++ b/FastTunnel.Core/Server/FastTunnelServer.cs @@ -24,7 +24,7 @@ public class FastTunnelServer public readonly IOptionsMonitor ServerOption; private readonly ILogger logger; - public ConcurrentDictionary> ResponseTasks { get; } = new(); + public ConcurrentDictionary> ResponseTasks { get; } = new(); public ConcurrentDictionary WebList { get; private set; } = new(); diff --git a/FastTunnel.Server/Program.cs b/FastTunnel.Server/Program.cs index 8bebf1d..d3d15c5 100644 --- a/FastTunnel.Server/Program.cs +++ b/FastTunnel.Server/Program.cs @@ -49,7 +49,6 @@ public class Program Host.CreateDefaultBuilder(args) .UseWindowsService() .UseSerilog((context, services, configuration) => configuration - .MinimumLevel.Debug() .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.Console()) .ConfigureWebHost(webHostBuilder =>