From ea8072ac6ed28ee2372b69693aa93347bc10767a Mon Sep 17 00:00:00 2001 From: "Gui.H" Date: Sun, 8 May 2022 21:09:08 +0800 Subject: [PATCH] 1 --- FastTunnel.Core/Client/FastTunnelServer.cs | 7 +- .../Extensions/ListenOptionsSwapExtensions.cs | 1 - .../Extensions/ServicesExtensions.cs | 36 ++--- .../Kestrel/FastTunnelConnectionContext.cs | 70 ++++++---- .../Kestrel/HandleHttpConnectionMiddleware.cs | 8 +- .../Kestrel/SwapConnectionMiddleware.cs | 127 +++++++++--------- .../Handlers/Client/SwapHandler.cs | 3 +- FastTunnel.Core/Models/WebInfo.cs | 6 + 8 files changed, 145 insertions(+), 113 deletions(-) diff --git a/FastTunnel.Core/Client/FastTunnelServer.cs b/FastTunnel.Core/Client/FastTunnelServer.cs index f794d73..0a94a94 100644 --- a/FastTunnel.Core/Client/FastTunnelServer.cs +++ b/FastTunnel.Core/Client/FastTunnelServer.cs @@ -1,4 +1,4 @@ -// Licensed under the Apache License, Version 2.0 (the "License"). +// Licensed under the Apache License, Version 2.0 (the "License"). // You may not use this file except in compliance with the License. // You may obtain a copy of the License at // https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE @@ -67,5 +67,10 @@ namespace FastTunnel.Core.Client Clients.Remove(client); client.Logout(); } + + internal bool TryGetWebProxyByHost(string host, out WebInfo web) + { + return WebList.TryGetValue(host, out web); + } } } diff --git a/FastTunnel.Core/Extensions/ListenOptionsSwapExtensions.cs b/FastTunnel.Core/Extensions/ListenOptionsSwapExtensions.cs index acfe404..0b35342 100644 --- a/FastTunnel.Core/Extensions/ListenOptionsSwapExtensions.cs +++ b/FastTunnel.Core/Extensions/ListenOptionsSwapExtensions.cs @@ -31,7 +31,6 @@ public static class ListenOptionsSwapExtensions listenOptions.Use(next => new SwapConnectionMiddleware(next, logger, fastTunnelServer).OnConnectionAsync); // 登录频率低,放在后面 - // listenOptions.Use(next => new ClientConnectionMiddleware(next, loggerClient, fastTunnelServer).OnConnectionAsync); return listenOptions; } } diff --git a/FastTunnel.Core/Extensions/ServicesExtensions.cs b/FastTunnel.Core/Extensions/ServicesExtensions.cs index 6eac2d4..3ba183e 100644 --- a/FastTunnel.Core/Extensions/ServicesExtensions.cs +++ b/FastTunnel.Core/Extensions/ServicesExtensions.cs @@ -73,28 +73,28 @@ namespace FastTunnel.Core.Extensions { app.UseWebSockets(); - var swapHandler = app.ApplicationServices.GetRequiredService(); + // var swapHandler = app.ApplicationServices.GetRequiredService(); var clientHandler = app.ApplicationServices.GetRequiredService(); app.Use(clientHandler.Handle); } - public static void MapFastTunnelServer(this IEndpointRouteBuilder endpoints) - { - endpoints.MapReverseProxy(); - endpoints.MapFallback(context => - { - var options = context.RequestServices.GetRequiredService>(); - var host = context.Request.Host.Host; - if (!host.EndsWith(options.CurrentValue.WebDomain) || host.Equals(options.CurrentValue.WebDomain)) - { - context.Response.StatusCode = 404; - return Task.CompletedTask; - } + //public static void MapFastTunnelServer(this IEndpointRouteBuilder endpoints) + //{ + // endpoints.MapReverseProxy(); + // endpoints.MapFallback(context => + // { + // var options = context.RequestServices.GetRequiredService>(); + // var host = context.Request.Host.Host; + // if (!host.EndsWith(options.CurrentValue.WebDomain) || host.Equals(options.CurrentValue.WebDomain)) + // { + // context.Response.StatusCode = 404; + // return Task.CompletedTask; + // } - context.Response.StatusCode = 200; - context.Response.WriteAsync(TunnelResource.Page_NotFound, CancellationToken.None); - return Task.CompletedTask; - }); - } + // context.Response.StatusCode = 200; + // context.Response.WriteAsync(TunnelResource.Page_NotFound, CancellationToken.None); + // return Task.CompletedTask; + // }); + //} } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/FastTunnelConnectionContext.cs b/FastTunnel.Core/Forwarder/Kestrel/FastTunnelConnectionContext.cs index 11374ac..393bc00 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/FastTunnelConnectionContext.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/FastTunnelConnectionContext.cs @@ -11,6 +11,8 @@ using System.IO.Pipelines; using System.Linq; using System.Text; using System.Threading.Tasks; +using FastTunnel.Core.Client; +using FastTunnel.Core.Models; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Features; using Microsoft.Extensions.Logging; @@ -19,10 +21,12 @@ namespace FastTunnel.Core.Forwarder.Kestrel; internal class FastTunnelConnectionContext : ConnectionContext { private ConnectionContext _inner; + FastTunnelServer fastTunnelServer; ILogger _logger; - public FastTunnelConnectionContext(ConnectionContext context, ILogger logger) + public FastTunnelConnectionContext(ConnectionContext context, FastTunnelServer fastTunnelServer, ILogger logger) { + this.fastTunnelServer = fastTunnelServer; this._inner = context; this._logger = logger; } @@ -35,6 +39,10 @@ internal class FastTunnelConnectionContext : ConnectionContext public override IDictionary Items { get => _inner.Items; set => _inner.Items = value; } + public bool IsFastTunnel => Method == "PROXY" || MatchWeb != null; + + public WebInfo MatchWeb { get; private set; } + public override ValueTask DisposeAsync() { return _inner.DisposeAsync(); @@ -49,45 +57,45 @@ internal class FastTunnelConnectionContext : ConnectionContext internal async Task TryAnalysisPipeAsync() { var reader = Transport.Input; + + ReadResult result; + ReadOnlySequence readableBuffer; + while (true) { - var result = await reader.ReadAsync(); - readableBuffer = result.Buffer; - SequencePosition? position = null; + result = await reader.ReadAsync(); + var tempBuffer = readableBuffer = result.Buffer; - var start = readableBuffer.Start; + SequencePosition? position = null; do { - position = readableBuffer.PositionOf((byte)'\n'); + position = tempBuffer.PositionOf((byte)'\n'); if (position != null) { - ProcessLine(readableBuffer.Slice(0, position.Value)); - if (HeaderEnd) + if (ProcessLine(tempBuffer.Slice(0, position.Value))) { - if (Method == "SWAP") + if (Method == "PROXY") { - reader.AdvanceTo(readableBuffer.End, readableBuffer.End); + reader.AdvanceTo(position.Value, position.Value); } else { - reader.AdvanceTo(start, start); + reader.AdvanceTo(readableBuffer.Start, readableBuffer.Start); } return false; } - // 剔除已处理的行 +\n - readableBuffer = readableBuffer.Slice(readableBuffer.GetPosition(1, position.Value)); + tempBuffer = tempBuffer.Slice(readableBuffer.GetPosition(1, position.Value)); } } - while (position != null); - - reader.AdvanceTo(start, start); + while (position != null && !readableBuffer.IsEmpty); if (result.IsCompleted) { + reader.AdvanceTo(readableBuffer.End, readableBuffer.End); break; } } @@ -99,7 +107,7 @@ internal class FastTunnelConnectionContext : ConnectionContext public string Host = null; public string MessageId; - bool HeaderEnd = false; + bool complete = false; bool isFirstLine = true; /// @@ -115,20 +123,20 @@ internal class FastTunnelConnectionContext : ConnectionContext /// /// /// - private void ProcessLine(ReadOnlySequence readOnlySequence) + private bool ProcessLine(ReadOnlySequence readOnlySequence) { var lineStr = Encoding.UTF8.GetString(readOnlySequence); - Console.WriteLine($"[Handle] {lineStr}"); + Console.WriteLine($"[HandleLien] {lineStr}"); if (isFirstLine) { Method = lineStr.Substring(0, lineStr.IndexOf(" ")).ToUpper(); switch (Method) { - case "SWAP": + case "PROXY": // 客户端发起消息互转 - var endIndex = lineStr.IndexOf(" ", 6); - MessageId = lineStr.Substring(6, endIndex - 6); + var endIndex = lineStr.IndexOf(" ", 7); + MessageId = lineStr.Substring(7, endIndex - 7); break; default: // 常规Http请求,需要检查Host决定是否进行代理 @@ -141,13 +149,23 @@ internal class FastTunnelConnectionContext : ConnectionContext { if (lineStr.Equals("\r")) { - HeaderEnd = true; - return; + complete = true; + + if (Method != "PROXY") + { + // 匹配Host, + if (fastTunnelServer.TryGetWebProxyByHost(Host, out WebInfo web)) + { + MatchWeb = web; + } + } + + return true; } switch (Method) { - case "SWAP": + case "PROXY": // 找msgid break; @@ -166,5 +184,7 @@ internal class FastTunnelConnectionContext : ConnectionContext break; } } + + return false; } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/HandleHttpConnectionMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/HandleHttpConnectionMiddleware.cs index a582fd3..457f5b5 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/HandleHttpConnectionMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/HandleHttpConnectionMiddleware.cs @@ -31,9 +31,11 @@ internal class HandleHttpConnectionMiddleware internal async Task OnConnectionAsync(ConnectionContext context) { - var ftContext = new FastTunnelConnectionContext(context, logger); - var fasttunnelHandle = await ftContext.TryAnalysisPipeAsync(); + logger.LogInformation("=========OnConnectionAsync==========="); + var ftContext = new FastTunnelConnectionContext(context, fastTunnelServer, logger); + await ftContext.TryAnalysisPipeAsync(); - await next(ftContext); + logger.LogInformation("=========TryAnalysisPipeAsync END==========="); + await next(ftContext.IsFastTunnel ? ftContext : context); } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/SwapConnectionMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/SwapConnectionMiddleware.cs index adb4229..c019e8c 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/SwapConnectionMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/SwapConnectionMiddleware.cs @@ -7,12 +7,16 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.IO; using System.IO.Pipelines; using System.Linq; +using System.Net.WebSockets; using System.Text; using System.Threading.Tasks; using FastTunnel.Core.Client; +using FastTunnel.Core.Extensions; using FastTunnel.Core.Forwarder.MiddleWare; +using FastTunnel.Core.Models; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using Microsoft.Extensions.Logging; @@ -35,16 +39,22 @@ internal class SwapConnectionMiddleware internal async Task OnConnectionAsync(ConnectionContext context) { var ctx = context as FastTunnelConnectionContext; - if (ctx != null) + if (ctx != null && ctx.IsFastTunnel) { - if (ctx.Method == "SWAP") + if (ctx.Method == "PROXY") { await doSwap(ctx); } + else if (ctx.MatchWeb != null) + { + await waitSwap(ctx); + } else { - await next(context); + throw new NotSupportedException(); } + + } else { @@ -52,17 +62,64 @@ internal class SwapConnectionMiddleware } } + private async Task waitSwap(FastTunnelConnectionContext context) + { + var requestId = Guid.NewGuid().ToString().Replace("-", ""); + var web = context.MatchWeb; + + TaskCompletionSource tcs = new(); + logger.LogDebug($"[Http]Swap开始 {requestId}|{context.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}"); + tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); }); + + fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs); + + try + { + try + { + // 发送指令给客户端,等待建立隧道 + await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{requestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", default); + } + catch (WebSocketException) + { + web.LogOut(); + + // 通讯异常,返回客户端离线 + throw new Exception("客户端离线"); + } + + using var res = await tcs.Task; + using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); + + var t1 = res.CopyToAsync(reverseConnection); + var t2 = reverseConnection.CopyToAsync(res); + + await Task.WhenAll(t1, t2); + + logger.LogDebug("[Http]Swap结束"); + } + catch (Exception) + { + throw; + } + finally + { + fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); + context.Transport.Input.Complete(); + context.Transport.Output.Complete(); + } + } + private async Task doSwap(FastTunnelConnectionContext context) { var requestId = context.MessageId; - if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseForYarp)) + if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream)) { - logger.LogError($"[PROXY]:RequestId不存在 {requestId}"); - return; + throw new Exception($"[PROXY]:RequestId不存在 {requestId}"); }; using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); - responseForYarp.TrySetResult(reverseConnection); + responseStream.TrySetResult(reverseConnection); var lifetime = context.Features.Get(); @@ -89,52 +146,6 @@ internal class SwapConnectionMiddleware } } - /// - /// - /// - /// - /// is for FastTunnel - async Task ReadPipeAsync(ConnectionContext context) - { - var reader = context.Transport.Input; - - var isProxy = false; - while (true) - { - var result = await reader.ReadAsync(); - var buffer = result.Buffer; - SequencePosition? position = null; - - do - { - position = buffer.PositionOf((byte)'\n'); - - if (position != null) - { - isProxy = ProcessProxyLine(buffer.Slice(0, position.Value)); - if (isProxy) - { - await Swap(buffer, position.Value, context); - return true; - } - else - { - context.Transport.Input.AdvanceTo(buffer.Start, buffer.Start); - return false; - } - } - } - while (position != null); - - if (result.IsCompleted) - { - break; - } - } - - return false; - } - private async Task Swap(ReadOnlySequence buffer, SequencePosition position, ConnectionContext context) { var firstLineBuffer = buffer.Slice(0, position); @@ -180,14 +191,4 @@ internal class SwapConnectionMiddleware logger.LogInformation($"=====================Swap End:{requestId}================== "); } } - - /// - /// - /// - /// - private bool ProcessProxyLine(ReadOnlySequence readOnlySequence) - { - var str = Encoding.UTF8.GetString(readOnlySequence); - return str.StartsWith("SWAP"); - } } diff --git a/FastTunnel.Core/Handlers/Client/SwapHandler.cs b/FastTunnel.Core/Handlers/Client/SwapHandler.cs index ba7d121..2f5996e 100644 --- a/FastTunnel.Core/Handlers/Client/SwapHandler.cs +++ b/FastTunnel.Core/Handlers/Client/SwapHandler.cs @@ -76,8 +76,7 @@ namespace FastTunnel.Core.Handlers.Client serverStream = sslStream; } - var reverse = $"SWAP /{requestId} HTTP/1.1\r\n"; - //var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n"; + var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n"; var requestMsg = Encoding.UTF8.GetBytes(reverse); await serverStream.WriteAsync(requestMsg, cancellationToken); diff --git a/FastTunnel.Core/Models/WebInfo.cs b/FastTunnel.Core/Models/WebInfo.cs index 3df0346..cc7fe0f 100644 --- a/FastTunnel.Core/Models/WebInfo.cs +++ b/FastTunnel.Core/Models/WebInfo.cs @@ -17,5 +17,11 @@ namespace FastTunnel.Core.Models public WebSocket Socket { get; set; } public WebConfig WebConfig { get; set; } + + internal void LogOut() + { + // TODO:退出登录 + throw new NotImplementedException(); + } } }