From 4627a1f705704dd500177dfdb730afee0d6f8a0a Mon Sep 17 00:00:00 2001 From: "Gui.H" Date: Fri, 15 Jul 2022 14:35:50 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=B1=E2=80=8D=F0=9F=8F=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FastTunnel.Core/Client/FastTunnelClient.cs | 2 +- .../Extensions/LoggerExtentions.cs | 19 -------- .../TaskCompletionSourceExtensions.cs | 27 ------------ .../Extensions/WebSocketExtensions.cs | 3 -- .../Forwarder/FastTunelProtocol.cs | 11 +++-- .../Kestrel/Features/FastTunnelFeature.cs | 6 ++- .../Kestrel/Features/IFastTunnelFeature.cs | 5 ++- .../Kestrel/MiddleWare/ForwarderMiddleware.cs | 10 ++--- .../Handlers/Server/ForwardDispatcher.cs | 9 ++-- FastTunnel.Core/Protocol/ProtocolConst.cs | 3 -- FastTunnel.Core/Protocol/TunnelProtocol.cs | 43 ------------------- .../Streams => Refs}/DuplexPipeStream.cs | 6 +-- .../ValueTaskExtensions.cs | 2 +- FastTunnel.Core/Server/FastTunnelServer.cs | 5 ++- FastTunnel.Core/Utilitys/SwapUtility.cs | 42 ------------------ 15 files changed, 28 insertions(+), 165 deletions(-) delete mode 100644 FastTunnel.Core/Extensions/LoggerExtentions.cs delete mode 100644 FastTunnel.Core/Extensions/TaskCompletionSourceExtensions.cs delete mode 100644 FastTunnel.Core/Protocol/TunnelProtocol.cs rename FastTunnel.Core/{Forwarder/Streams => Refs}/DuplexPipeStream.cs (97%) rename FastTunnel.Core/{Extensions => Refs}/ValueTaskExtensions.cs (97%) delete mode 100644 FastTunnel.Core/Utilitys/SwapUtility.cs diff --git a/FastTunnel.Core/Client/FastTunnelClient.cs b/FastTunnel.Core/Client/FastTunnelClient.cs index b1ce625..11a6813 100644 --- a/FastTunnel.Core/Client/FastTunnelClient.cs +++ b/FastTunnel.Core/Client/FastTunnelClient.cs @@ -146,7 +146,7 @@ public class FastTunnelClient : IFastTunnelClient } catch (Exception ex) { - _logger.LogError(ex); + _logger.LogError(ex, "[HandleServerRequest Error]"); } } diff --git a/FastTunnel.Core/Extensions/LoggerExtentions.cs b/FastTunnel.Core/Extensions/LoggerExtentions.cs deleted file mode 100644 index c6cc4df..0000000 --- a/FastTunnel.Core/Extensions/LoggerExtentions.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"). -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE -// Copyright (c) 2019 Gui.H - -using Microsoft.Extensions.Logging; -using System; - -namespace FastTunnel.Core.Extensions -{ - public static class LoggerExtentions - { - public static void LogError(this ILogger logger, Exception ex) - { - logger.LogError(ex, string.Empty); - } - } -} diff --git a/FastTunnel.Core/Extensions/TaskCompletionSourceExtensions.cs b/FastTunnel.Core/Extensions/TaskCompletionSourceExtensions.cs deleted file mode 100644 index 1f890c4..0000000 --- a/FastTunnel.Core/Extensions/TaskCompletionSourceExtensions.cs +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"). -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE -// Copyright (c) 2019 Gui.H - -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace FastTunnel.Core.Extensions; - -public static class TaskCompletionSourceExtensions -{ - public static void SetTimeOut(this TaskCompletionSource tcs, int timeoutMs, Action? action) - { - var ct = new CancellationTokenSource(timeoutMs); - ct.Token.Register(() => - { - if (tcs.Task.IsCompleted) - return; - - tcs.TrySetCanceled(); - action?.Invoke(); - }, useSynchronizationContext: false); - } -} diff --git a/FastTunnel.Core/Extensions/WebSocketExtensions.cs b/FastTunnel.Core/Extensions/WebSocketExtensions.cs index 7b1caf8..572da5a 100644 --- a/FastTunnel.Core/Extensions/WebSocketExtensions.cs +++ b/FastTunnel.Core/Extensions/WebSocketExtensions.cs @@ -25,9 +25,6 @@ public static class WebSocketExtensions } var buffer = Encoding.UTF8.GetBytes($"{(char)type}{content}\n"); - if (type != MessageType.LogIn && buffer.Length > ProtocolConst.MAX_CMD_LENGTH) - throw new ArgumentOutOfRangeException(nameof(content)); - await socket.SendAsync(buffer, WebSocketMessageType.Binary, false, cancellationToken); } } diff --git a/FastTunnel.Core/Forwarder/FastTunelProtocol.cs b/FastTunnel.Core/Forwarder/FastTunelProtocol.cs index b050dde..c735cbf 100644 --- a/FastTunnel.Core/Forwarder/FastTunelProtocol.cs +++ b/FastTunnel.Core/Forwarder/FastTunelProtocol.cs @@ -53,8 +53,7 @@ public class FastTunelProtocol readableBuffer = result.Buffer; SequencePosition start = readableBuffer.Start; - SequencePosition? position = null; - + SequencePosition? position; do { position = readableBuffer.PositionOf(ByteLF); @@ -62,7 +61,7 @@ public class FastTunelProtocol if (position != null) { var readedPosition = readableBuffer.GetPosition(1, position.Value); - if (ProcessHeaderLine(readableBuffer.Slice(0, position.Value), out var _)) + if (ProcessHeaderLine(readableBuffer.Slice(0, position.Value), out _)) { if (Method == ProtocolConst.HTTP_METHOD_SWAP) { @@ -101,10 +100,10 @@ public class FastTunelProtocol public string Method; public string Host = null; - public string MessageId; + public Guid MessageId; private bool isFirstLine = true; - public FastTunnelServer fastTunnelServer { get; } + private FastTunnelServer fastTunnelServer { get; } /// /// @@ -133,7 +132,7 @@ public class FastTunelProtocol case ProtocolConst.HTTP_METHOD_SWAP: // 客户端发起消息互转 var endIndex = headerLineStr.IndexOf(" ", 7); - MessageId = headerLineStr.Substring(7, endIndex - 7); + MessageId = Guid.Parse(headerLineStr.Substring(7, endIndex - 7)); break; default: // 常规Http请求,需要检查Host决定是否进行代理 diff --git a/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs b/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs index f242a6c..34eeb72 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/Features/FastTunnelFeature.cs @@ -4,6 +4,7 @@ // https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE // Copyright (c) 2019 Gui.H +using System; using System.Collections.Generic; using FastTunnel.Core.Models; @@ -14,7 +15,10 @@ public struct FastTunnelFeature : IFastTunnelFeature public WebInfo MatchWeb { get; set; } public IList HasReadLInes { get; set; } + public string Method { get; set; } + public string Host { get; set; } - public string MessageId { get; set; } + + public Guid MessageId { get; set; } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/Features/IFastTunnelFeature.cs b/FastTunnel.Core/Forwarder/Kestrel/Features/IFastTunnelFeature.cs index 2d2bdb4..0ac0e35 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/Features/IFastTunnelFeature.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/Features/IFastTunnelFeature.cs @@ -4,6 +4,7 @@ // https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE // Copyright (c) 2019 Gui.H +using System; using System.Collections.Generic; using FastTunnel.Core.Models; @@ -16,6 +17,8 @@ internal interface IFastTunnelFeature public IList HasReadLInes { get; set; } public string Method { get; set; } + public string Host { get; set; } - public string MessageId { get; set; } + + public Guid MessageId { get; set; } } diff --git a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs index b8d8d81..f7089a4 100644 --- a/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs +++ b/FastTunnel.Core/Forwarder/Kestrel/MiddleWare/ForwarderMiddleware.cs @@ -15,6 +15,7 @@ using FastTunnel.Core.Forwarder.Kestrel.Features; using FastTunnel.Core.Forwarder.Streams; using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Protocol; +using FastTunnel.Core.Refs; using FastTunnel.Core.Server; using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; @@ -74,7 +75,7 @@ internal class ForwarderMiddleware private async Task waitSwap(ConnectionContext context) { var feat = context.Features.Get(); - var requestId = Guid.NewGuid().ToString().Replace("-", ""); + var requestId = Guid.NewGuid(); Interlocked.Increment(ref UserCount); @@ -162,14 +163,9 @@ internal class ForwarderMiddleware var closedAwaiter = new TaskCompletionSource(); - cancellationTokenSource.Token.Register(() => - { - closedAwaiter.TrySetCanceled(); - }); - try { - await closedAwaiter.Task; + await closedAwaiter.Task.WaitAsync(cancellationTokenSource.Token); } catch (Exception) { diff --git a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs index 7325b6e..6005771 100644 --- a/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs +++ b/FastTunnel.Core/Handlers/Server/ForwardDispatcher.cs @@ -42,7 +42,7 @@ public class ForwardDispatcher /// public async Task DispatchAsync(Socket _socket, WebSocket client) { - var msgId = Guid.NewGuid().ToString().Replace("-", ""); + var msgId = Guid.NewGuid(); (Stream Stream, CancellationTokenSource TokenSource) res = default; @@ -51,10 +51,7 @@ public class ForwardDispatcher try { logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}"); - var tcs = new TaskCompletionSource<(Stream Stream, CancellationTokenSource TokenSource)>(); - tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); }); - if (!_server.ResponseTasks.TryAdd(msgId, tcs)) { return; @@ -81,11 +78,11 @@ public class ForwardDispatcher return; } - res = await tcs.Task; + res = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); //await using var stream2 = new SocketDuplexPipe(_socket); using var stream2 = new NetworkStream(_socket); - await Task.WhenAny(res.Stream.CopyToAsync(stream2), stream2.CopyToAsync(res.Stream)); + await Task.WhenAny(res.Stream.CopyToAsync(stream2), stream2.CopyToAsync(res.Stream)).WaitAsync(res.TokenSource.Token); } catch (Exception ex) { diff --git a/FastTunnel.Core/Protocol/ProtocolConst.cs b/FastTunnel.Core/Protocol/ProtocolConst.cs index e407c0c..dbe4c96 100644 --- a/FastTunnel.Core/Protocol/ProtocolConst.cs +++ b/FastTunnel.Core/Protocol/ProtocolConst.cs @@ -11,8 +11,5 @@ public class ProtocolConst public const string FASTTUNNEL_VERSION = "FT_VERSION"; public const string FASTTUNNEL_MSGID = "FT_MSGID"; public const string FASTTUNNEL_TOKEN = "FT_TOKEN"; - - public const int MAX_CMD_LENGTH = 100; - public const string HTTP_METHOD_SWAP = "PROXY"; } diff --git a/FastTunnel.Core/Protocol/TunnelProtocol.cs b/FastTunnel.Core/Protocol/TunnelProtocol.cs deleted file mode 100644 index ea59212..0000000 --- a/FastTunnel.Core/Protocol/TunnelProtocol.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"). -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE -// Copyright (c) 2019 Gui.H - -using System.Collections.Generic; -using System.Linq; -using System.Text; -using FastTunnel.Core.Extensions; - -namespace FastTunnel.Core.Protocol; - -public class TunnelProtocol -{ - private string massgeTemp; - private readonly string m_sectionFlag = "\n"; - - public IEnumerable HandleBuffer(byte[] buffer, int offset, int count) - { - var words = buffer.GetString(offset, count); - var sum = massgeTemp + words; - - if (sum.Contains(m_sectionFlag)) - { - var array = (sum).Split(m_sectionFlag); - massgeTemp = null; - var fullMsg = words.EndsWith(m_sectionFlag); - - if (!fullMsg) - { - massgeTemp = array[array.Length - 1]; - } - - return array.Take(array.Length - 1); - } - else - { - massgeTemp = sum; - return null; - } - } -} diff --git a/FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs b/FastTunnel.Core/Refs/DuplexPipeStream.cs similarity index 97% rename from FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs rename to FastTunnel.Core/Refs/DuplexPipeStream.cs index 79a5a13..fb75437 100644 --- a/FastTunnel.Core/Forwarder/Streams/DuplexPipeStream.cs +++ b/FastTunnel.Core/Refs/DuplexPipeStream.cs @@ -11,12 +11,10 @@ using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -using FastTunnel.Core.Extensions; -using FastTunnel.Core.Refs; -namespace FastTunnel.Core.Forwarder.Streams; +namespace FastTunnel.Core.Refs; -internal class DuplexPipeStream : System.IO.Stream +internal class DuplexPipeStream : Stream { private readonly PipeReader _input; private readonly PipeWriter _output; diff --git a/FastTunnel.Core/Extensions/ValueTaskExtensions.cs b/FastTunnel.Core/Refs/ValueTaskExtensions.cs similarity index 97% rename from FastTunnel.Core/Extensions/ValueTaskExtensions.cs rename to FastTunnel.Core/Refs/ValueTaskExtensions.cs index 3c30760..b47cb1e 100644 --- a/FastTunnel.Core/Extensions/ValueTaskExtensions.cs +++ b/FastTunnel.Core/Refs/ValueTaskExtensions.cs @@ -8,7 +8,7 @@ using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Threading.Tasks; -namespace FastTunnel.Core.Extensions; +namespace FastTunnel.Core.Refs; internal static class ValueTaskExtensions { diff --git a/FastTunnel.Core/Server/FastTunnelServer.cs b/FastTunnel.Core/Server/FastTunnelServer.cs index 84ac39f..397ff82 100644 --- a/FastTunnel.Core/Server/FastTunnelServer.cs +++ b/FastTunnel.Core/Server/FastTunnelServer.cs @@ -23,7 +23,10 @@ public class FastTunnelServer public readonly IOptionsMonitor ServerOption; private readonly ILogger logger; - public ConcurrentDictionary> ResponseTasks { get; } = new(); + /// + /// 待转发列表 + /// + public ConcurrentDictionary> ResponseTasks { get; } = new(); public ConcurrentDictionary WebList { get; private set; } = new(); diff --git a/FastTunnel.Core/Utilitys/SwapUtility.cs b/FastTunnel.Core/Utilitys/SwapUtility.cs deleted file mode 100644 index 27d4ce8..0000000 --- a/FastTunnel.Core/Utilitys/SwapUtility.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"). -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE -// Copyright (c) 2019 Gui.H - -using System.Buffers; -using System.IO.Pipelines; -using System.Threading; -using System.Threading.Tasks; - -namespace FastTunnel.Core.Utilitys; - -internal class SwapUtility -{ - IDuplexPipe pipe1; - IDuplexPipe pipe2; - - public SwapUtility(IDuplexPipe pipe1, IDuplexPipe pipe2) - { - this.pipe1 = pipe1; - this.pipe2 = pipe2; - } - - internal async Task SwapAsync(CancellationToken cancellationToken = default) - { - - } - - private async Task T1(IDuplexPipe pipe, IDuplexPipe pipe1, CancellationToken cancellationToken = default) - { - while (true) - { - ReadResult result; - ReadOnlySequence readableBuffer; - - result = await pipe.Input.ReadAsync(cancellationToken); - readableBuffer = result.Buffer; - - } - } -}