From 8fbf17d7755e5417b5a4d78773352923742a4c66 Mon Sep 17 00:00:00 2001 From: "Gui.H" Date: Wed, 11 May 2022 23:30:22 +0800 Subject: [PATCH] 1 --- FastTunnel.Client/FastTunnel.Client.csproj | 1 - FastTunnel.Client/Program.cs | 1 + .../Extensions/ServicesExtensions.cs | 1 - .../HandleHttpConnectionMiddleware.cs | 9 ++- .../MiddleWare/SwapConnectionMiddleware.cs | 58 ++++++++++--------- .../DuplexPipeStream.cs | 0 .../Forwarder/Stream/SocketDuplexPipe.cs | 37 ++++++++++++ .../Handlers/Client/SwapHandler.cs | 18 +++--- .../Server}/FastTunnelClientHandler.cs | 3 +- .../Handlers/Server/ForwardDispatcher.cs | 14 ++--- FastTunnel.Core/Server/FastTunnelServer.cs | 3 +- FastTunnel.Core/Utilitys/SwapUtility.cs | 46 +++++++++++++++ FastTunnel.Server/Program.cs | 1 + 13 files changed, 141 insertions(+), 51 deletions(-) rename FastTunnel.Core/Forwarder/{MiddleWare => Stream}/DuplexPipeStream.cs (100%) create mode 100644 FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs rename FastTunnel.Core/{Forwarder/MiddleWare => Handlers/Server}/FastTunnelClientHandler.cs (97%) create mode 100644 FastTunnel.Core/Utilitys/SwapUtility.cs diff --git a/FastTunnel.Client/FastTunnel.Client.csproj b/FastTunnel.Client/FastTunnel.Client.csproj index 9d20edd..8a4cd72 100644 --- a/FastTunnel.Client/FastTunnel.Client.csproj +++ b/FastTunnel.Client/FastTunnel.Client.csproj @@ -5,7 +5,6 @@ - diff --git a/FastTunnel.Client/Program.cs b/FastTunnel.Client/Program.cs index 0a2de75..5560b5e 100644 --- a/FastTunnel.Client/Program.cs +++ b/FastTunnel.Client/Program.cs @@ -49,6 +49,7 @@ class Program Host.CreateDefaultBuilder(args) .UseWindowsService() .UseSerilog((context, services, configuration) => configuration + .MinimumLevel.Debug() .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.Console()) .ConfigureServices((hostContext, services) => diff --git a/FastTunnel.Core/Extensions/ServicesExtensions.cs b/FastTunnel.Core/Extensions/ServicesExtensions.cs index 5d1e0a5..ec43b9b 100644 --- a/FastTunnel.Core/Extensions/ServicesExtensions.cs +++ b/FastTunnel.Core/Extensions/ServicesExtensions.cs @@ -6,7 +6,6 @@ using FastTunnel.Core.Client; using FastTunnel.Core.Config; -using FastTunnel.Core.Forwarder.MiddleWare; using FastTunnel.Core.Handlers.Client; using FastTunnel.Core.Handlers.Server; using FastTunnel.Core.Server; diff --git a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/HandleHttpConnectionMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/HandleHttpConnectionMiddleware.cs index ed38482..772dab0 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/HandleHttpConnectionMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/HandleHttpConnectionMiddleware.cs @@ -35,6 +35,13 @@ internal class FastTunnelConnectionMiddleware await ftContext.TryAnalysisPipeAsync(); logger.LogInformation("=========TryAnalysisPipeAsync END==========="); - await next(ftContext.IsFastTunnel ? ftContext : context); + if (ftContext.IsFastTunnel) + { + await next(ftContext.IsFastTunnel ? ftContext : context); + } + else + { + await next(context); + } } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/SwapConnectionMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/SwapConnectionMiddleware.cs index e7bd940..414d343 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/SwapConnectionMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/SwapConnectionMiddleware.cs @@ -7,6 +7,7 @@ using System; using System.Buffers; using System.IO; +using System.IO.Pipelines; using System.Net.WebSockets; using System.Text; using System.Threading.Tasks; @@ -17,6 +18,7 @@ using FastTunnel.Core.Forwarder.MiddleWare; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Protocol; using FastTunnel.Core.Server; +using FastTunnel.Core.Utilitys; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using Microsoft.Extensions.Logging; @@ -46,11 +48,11 @@ internal class SwapConnectionMiddleware { if (ctx.Method == ProtocolConst.HTTP_METHOD_SWAP) { - await doSwap(ctx); + await setResponse(ctx); } else if (ctx.MatchWeb != null) { - await waitSwap(ctx); + await waitResponse(ctx); } else { @@ -63,17 +65,19 @@ internal class SwapConnectionMiddleware } } - private async Task waitSwap(FastTunnelConnectionContext context) + private async Task waitResponse(FastTunnelConnectionContext context) { var requestId = Guid.NewGuid().ToString().Replace("-", ""); var web = context.MatchWeb; - TaskCompletionSource tcs = new(); + TaskCompletionSource tcs = new(); logger.LogDebug($"[Http]Swap开始 {requestId}|{context.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}"); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); }); fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs); + IDuplexPipe res = null; + try { try @@ -89,29 +93,34 @@ internal class SwapConnectionMiddleware throw new ClienOffLineException("客户端离线"); } - using var res = await tcs.Task; - using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + var lifetime = context.Features.Get(); - var t1 = res.CopyToAsync(reverseConnection); - var t2 = reverseConnection.CopyToAsync(res); + res = await tcs.Task; - await Task.WhenAll(t1, t2); + // using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); - logger.LogDebug("[Http]Swap结束"); + var t1 = res.Input.CopyToAsync(context.Transport.Output, lifetime.ConnectionClosed); + var t2 = context.Transport.Input.CopyToAsync(res.Output, lifetime.ConnectionClosed); + await Task.WhenAny(t1, t2); } - catch (Exception) + catch (Exception ex) { - throw; + logger.LogError(ex, "[waitResponse]"); } finally { + logger.LogInformation("[Http] waitSwap结束"); fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); - context.Transport.Input.Complete(); - context.Transport.Output.Complete(); + + await context.Transport.Input.CompleteAsync(); + await context.Transport.Output.CompleteAsync(); + + await res.Input.CompleteAsync(); + await res.Output.CompleteAsync(); } } - private async Task doSwap(FastTunnelConnectionContext context) + private async Task setResponse(FastTunnelConnectionContext context) { var requestId = context.MessageId; if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream)) @@ -119,31 +128,26 @@ internal class SwapConnectionMiddleware throw new Exception($"[PROXY]:RequestId不存在 {requestId}"); }; - using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); - responseStream.TrySetResult(reverseConnection); + //using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + responseStream.TrySetResult(context.Transport); var lifetime = context.Features.Get(); - var closedAwaiter = new TaskCompletionSource(); - lifetime.ConnectionClosed.Register((task) => - { - (task as TaskCompletionSource).SetResult(null); - }, closedAwaiter); - try { - await closedAwaiter.Task; + closedAwaiter.Task.Wait(lifetime.ConnectionClosed); } catch (Exception ex) { - logger.LogError(ex, ""); + logger.LogError(ex, "[setResponse]"); } finally { - context.Transport.Input.Complete(); - context.Transport.Output.Complete(); logger.LogInformation($"=====================Swap End:{requestId}================== "); + await context.Transport.Input.CompleteAsync(); + await context.Transport.Output.CompleteAsync(); + context.Abort(); } } } diff --git a/FastTunnel.Core/Forwarder/MiddleWare/DuplexPipeStream.cs b/FastTunnel.Core/Forwarder/Stream/DuplexPipeStream.cs similarity index 100% rename from FastTunnel.Core/Forwarder/MiddleWare/DuplexPipeStream.cs rename to FastTunnel.Core/Forwarder/Stream/DuplexPipeStream.cs diff --git a/FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs b/FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs new file mode 100644 index 0000000..35ec3f2 --- /dev/null +++ b/FastTunnel.Core/Forwarder/Stream/SocketDuplexPipe.cs @@ -0,0 +1,37 @@ +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE +// Copyright (c) 2019 Gui.H + +using System; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace FastTunnel.Core.Forwarder.MiddleWare; + +internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable +{ + NetworkStream stream; + + public SocketDuplexPipe(Socket socket) + { + stream = new NetworkStream(socket, true); + + Input = PipeReader.Create(stream); + Output = PipeWriter.Create(stream); + } + + public PipeReader Input { get; private set; } + + public PipeWriter Output { get; private set; } + + public async ValueTask DisposeAsync() + { + await stream.DisposeAsync(); + } +} diff --git a/FastTunnel.Core/Handlers/Client/SwapHandler.cs b/FastTunnel.Core/Handlers/Client/SwapHandler.cs index 822f234..ef79b25 100644 --- a/FastTunnel.Core/Handlers/Client/SwapHandler.cs +++ b/FastTunnel.Core/Handlers/Client/SwapHandler.cs @@ -31,6 +31,7 @@ public class SwapHandler : IClientHandler var msgs = msg.Split('|'); var requestId = msgs[0]; var address = msgs[1]; + _logger.LogDebug($"[HandlerMsgAsync] start {requestId}"); try { @@ -40,24 +41,19 @@ public class SwapHandler : IClientHandler var taskX = serverStream.CopyToAsync(localStream, cancellationToken); var taskY = localStream.CopyToAsync(serverStream, cancellationToken); - try - { - await Task.WhenAll(taskX, taskY); - } - catch (Exception) - { + await Task.WhenAny(taskX, taskY); - } - finally - { - _logger.LogError($"=====================Swap End:{requestId}================== "); - } + _logger.LogDebug($"[HandlerMsgAsync] success {requestId}"); } } catch (Exception ex) { _logger.LogError(ex, $"Swap error {requestId}"); } + finally + { + _logger.LogDebug($"=====================Swap End:{requestId}================== "); + } } private async Task createLocal(string requestId, string localhost, CancellationToken cancellationToken) diff --git a/FastTunnel.Core/Forwarder/MiddleWare/FastTunnelClientHandler.cs b/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs similarity index 97% rename from FastTunnel.Core/Forwarder/MiddleWare/FastTunnelClientHandler.cs rename to FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs index 86cc793..d73761f 100644 --- a/FastTunnel.Core/Forwarder/MiddleWare/FastTunnelClientHandler.cs +++ b/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs @@ -9,7 +9,6 @@ using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Extensions; -using FastTunnel.Core.Handlers.Server; using FastTunnel.Core.Models; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Protocol; @@ -17,7 +16,7 @@ using FastTunnel.Core.Server; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -namespace FastTunnel.Core.Forwarder.MiddleWare; +namespace FastTunnel.Core.Handlers.Server; public class FastTunnelClientHandler { diff --git a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs index ab6b7ce..0c8dd85 100644 --- a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs +++ b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs @@ -6,12 +6,14 @@ using System; using System.IO; +using System.IO.Pipelines; using System.Net.Sockets; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Exceptions; using FastTunnel.Core.Extensions; +using FastTunnel.Core.Forwarder.MiddleWare; using FastTunnel.Core.Models; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Server; @@ -47,7 +49,7 @@ public class ForwardDispatcher await Task.Yield(); logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}"); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); }); _server.ResponseTasks.TryAdd(msgId, tcs); @@ -73,13 +75,10 @@ public class ForwardDispatcher return; } - using (var stream1 = await tcs.Task) - using (var stream2 = new NetworkStream(_socket, true) { ReadTimeout = 1000 * 60 * 10 }) - { - await Task.WhenAll(stream1.CopyToAsync(stream2), stream2.CopyToAsync(stream1)); - } + var stream1 = await tcs.Task; + await using var stream2 = new SocketDuplexPipe(_socket); - logger.LogDebug($"[Forward]Swap OK {msgId}"); + await Task.WhenAny(stream1.Input.CopyToAsync(stream2.Output), stream2.Input.CopyToAsync(stream1.Output)); } catch (Exception ex) { @@ -87,6 +86,7 @@ public class ForwardDispatcher } finally { + logger.LogDebug($"[Forward]Swap OK {msgId}"); _server.ResponseTasks.TryRemove(msgId, out _); } } diff --git a/FastTunnel.Core/Server/FastTunnelServer.cs b/FastTunnel.Core/Server/FastTunnelServer.cs index f14f89f..0a888c9 100644 --- a/FastTunnel.Core/Server/FastTunnelServer.cs +++ b/FastTunnel.Core/Server/FastTunnelServer.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Config; @@ -23,7 +24,7 @@ public class FastTunnelServer public readonly IOptionsMonitor ServerOption; private readonly ILogger logger; - public ConcurrentDictionary> ResponseTasks { get; } = new(); + public ConcurrentDictionary> ResponseTasks { get; } = new(); public ConcurrentDictionary WebList { get; private set; } = new(); diff --git a/FastTunnel.Core/Utilitys/SwapUtility.cs b/FastTunnel.Core/Utilitys/SwapUtility.cs new file mode 100644 index 0000000..c491869 --- /dev/null +++ b/FastTunnel.Core/Utilitys/SwapUtility.cs @@ -0,0 +1,46 @@ +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE +// Copyright (c) 2019 Gui.H + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace FastTunnel.Core.Utilitys; + +internal class SwapUtility +{ + IDuplexPipe pipe1; + IDuplexPipe pipe2; + + public SwapUtility(IDuplexPipe pipe1, IDuplexPipe pipe2) + { + this.pipe1 = pipe1; + this.pipe2 = pipe2; + } + + internal async Task SwapAsync(CancellationToken cancellationToken = default) + { + + } + + private async Task T1(IDuplexPipe pipe, IDuplexPipe pipe1, CancellationToken cancellationToken = default) + { + while (true) + { + ReadResult result; + ReadOnlySequence readableBuffer; + + result = await pipe.Input.ReadAsync(cancellationToken); + readableBuffer = result.Buffer; + + } + } +} diff --git a/FastTunnel.Server/Program.cs b/FastTunnel.Server/Program.cs index 4c08d96..a16fc72 100644 --- a/FastTunnel.Server/Program.cs +++ b/FastTunnel.Server/Program.cs @@ -49,6 +49,7 @@ public class Program Host.CreateDefaultBuilder(args) .UseWindowsService() .UseSerilog((context, services, configuration) => configuration + .MinimumLevel.Debug() .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.Console()) .ConfigureWebHost(webHostBuilder =>