diff --git a/FastTunnel.Core/Client/FastTunnelClient.cs b/FastTunnel.Core/Client/FastTunnelClient.cs index e5145aa..b1ce625 100644 --- a/FastTunnel.Core/Client/FastTunnelClient.cs +++ b/FastTunnel.Core/Client/FastTunnelClient.cs @@ -5,6 +5,8 @@ // Copyright (c) 2019 Gui.H using System; +using System.Buffers; +using System.Net.Sockets; using System.Net.WebSockets; using System.Text; using System.Threading; @@ -12,26 +14,26 @@ using System.Threading.Tasks; using FastTunnel.Core.Config; using FastTunnel.Core.Extensions; using FastTunnel.Core.Handlers.Client; +using FastTunnel.Core.Models; using FastTunnel.Core.Models.Massage; -using FastTunnel.Core.Protocol; using FastTunnel.Core.Utilitys; +using Microsoft.AspNetCore.DataProtection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace FastTunnel.Core.Client; -/// -/// 客户端 -/// public class FastTunnelClient : IFastTunnelClient { private ClientWebSocket socket; protected readonly ILogger _logger; - private readonly SwapHandler _newCustomerHandler; - private readonly LogHandler _logHandler; + protected DefaultClientConfig ClientConfig { get; private set; } - public DefaultClientConfig ClientConfig { get; private set; } + private readonly SwapHandler swapHandler; + private readonly LogHandler logHandler; + + private static ReadOnlySpan EndSpan => new ReadOnlySpan(new byte[] { (byte)'\n' }); public SuiDaoServer Server { get; protected set; } @@ -41,9 +43,10 @@ public class FastTunnelClient : IFastTunnelClient LogHandler logHandler, IOptionsMonitor configuration) { + ReadOnlySpan span = new ReadOnlySpan(); _logger = logger; - _newCustomerHandler = newCustomerHandler; - _logHandler = logHandler; + swapHandler = newCustomerHandler; + this.logHandler = logHandler; ClientConfig = configuration.CurrentValue; Server = ClientConfig.Server; } @@ -51,8 +54,7 @@ public class FastTunnelClient : IFastTunnelClient /// /// 启动客户端 /// - /// - public async void StartAsync(CancellationToken cancellationToken) + public virtual async Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("===== FastTunnel Client Start ====="); @@ -75,32 +77,29 @@ public class FastTunnelClient : IFastTunnelClient private async Task loginAsync(CancellationToken cancellationToken) { - try + var logMsg = GetLoginMsg(cancellationToken); + if (socket != null) { - var logMsg = GetLoginMsg(cancellationToken); - - // 连接到的目标IP - socket = new ClientWebSocket(); - socket.Options.RemoteCertificateValidationCallback = delegate { return true; }; - socket.Options.SetRequestHeader(ProtocolConst.FASTTUNNEL_VERSION, AssemblyUtility.GetVersion().ToString()); - socket.Options.SetRequestHeader(ProtocolConst.FASTTUNNEL_TOKEN, ClientConfig.Token); - - _logger.LogInformation($"正在连接服务端 {Server.ServerAddr}:{Server.ServerPort}"); - await socket.ConnectAsync( - new Uri($"{Server.Protocol}://{Server.ServerAddr}:{Server.ServerPort}"), cancellationToken); - - _logger.LogDebug("连接服务端成功"); - - // 登录 - await socket.SendCmdAsync(MessageType.LogIn, logMsg, cancellationToken); - } - catch (Exception) - { - throw; + socket.Abort(); } + + // 连接到的目标IP + socket = new ClientWebSocket(); + socket.Options.RemoteCertificateValidationCallback = delegate { return true; }; + socket.Options.SetRequestHeader(FastTunnelConst.FASTTUNNEL_VERSION, AssemblyUtility.GetVersion().ToString()); + socket.Options.SetRequestHeader(FastTunnelConst.FASTTUNNEL_TOKEN, ClientConfig.Token); + + _logger.LogInformation($"正在连接服务端 {Server.ServerAddr}:{Server.ServerPort}"); + await socket.ConnectAsync( + new Uri($"{Server.Protocol}://{Server.ServerAddr}:{Server.ServerPort}"), cancellationToken); + + _logger.LogDebug("连接服务端成功"); + + // 登录 + await socket.SendCmdAsync(MessageType.LogIn, logMsg, cancellationToken); } - public virtual string GetLoginMsg(CancellationToken cancellationToken) + protected virtual string GetLoginMsg(CancellationToken cancellationToken) { Server = ClientConfig.Server; return new LogInMassage @@ -110,46 +109,53 @@ public class FastTunnelClient : IFastTunnelClient }.ToJson(); } - private async Task ReceiveServerAsync(CancellationToken cancellationToken) + + protected async Task ReceiveServerAsync(CancellationToken cancellationToken) { - var buffer = new byte[ProtocolConst.MAX_CMD_LENGTH]; - while (!cancellationToken.IsCancellationRequested) - { - var res = await socket.ReceiveAsync(buffer, cancellationToken); - var type = buffer[0]; - var content = Encoding.UTF8.GetString(buffer, 1, res.Count - 1); - HandleServerRequestAsync(type, content, cancellationToken); - } + var utility = new WebSocketUtility(socket, ProcessLine); + await utility.ProcessLinesAsync(cancellationToken); } - private async void HandleServerRequestAsync(byte cmd, string ctx, CancellationToken cancellationToken) + private void ProcessLine(ReadOnlySequence line, CancellationToken cancellationToken) + { + HandleServerRequestAsync(line, cancellationToken); + } + + private void HandleServerRequestAsync(ReadOnlySequence line, CancellationToken cancellationToken) { try { + var row = line.ToArray(); + var cmd = row[0]; IClientHandler handler; switch ((MessageType)cmd) { case MessageType.SwapMsg: case MessageType.Forward: - handler = _newCustomerHandler; + handler = swapHandler; break; case MessageType.Log: - handler = _logHandler; + handler = logHandler; break; default: throw new Exception($"未处理的消息:cmd={cmd}"); } - await handler.HandlerMsgAsync(this, ctx, cancellationToken); + var content = Encoding.UTF8.GetString(line.Slice(1)); + handler.HandlerMsgAsync(this, content, cancellationToken); } catch (Exception ex) { - _logger.LogError(ex, "HandleServerRequest Error"); + _logger.LogError(ex); } } - public void Stop(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("===== FastTunnel Client Stoping ====="); + if (socket != null) + { + socket.Abort(); + } } } diff --git a/FastTunnel.Core/Client/IFastTunnelClient.cs b/FastTunnel.Core/Client/IFastTunnelClient.cs index fb7af1c..377bf4d 100644 --- a/FastTunnel.Core/Client/IFastTunnelClient.cs +++ b/FastTunnel.Core/Client/IFastTunnelClient.cs @@ -5,12 +5,13 @@ // Copyright (c) 2019 Gui.H using System.Threading; +using System.Threading.Tasks; namespace FastTunnel.Core.Client; public interface IFastTunnelClient { - void StartAsync(CancellationToken cancellationToken); + Task StartAsync(CancellationToken cancellationToken); - void Stop(CancellationToken cancellationToken); + Task StopAsync(CancellationToken cancellationToken); } diff --git a/FastTunnel.Core/Client/ServiceFastTunnelClient.cs b/FastTunnel.Core/Client/ServiceFastTunnelClient.cs index ddbe0a9..f4eabd5 100644 --- a/FastTunnel.Core/Client/ServiceFastTunnelClient.cs +++ b/FastTunnel.Core/Client/ServiceFastTunnelClient.cs @@ -31,10 +31,9 @@ public class ServiceFastTunnelClient : IHostedService await Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { - _fastTunnelClient.Stop(cancellationToken); - return Task.CompletedTask; + await _fastTunnelClient.StopAsync(cancellationToken); } private void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) diff --git a/FastTunnel.Core/Extensions/LoggerExtentions.cs b/FastTunnel.Core/Extensions/LoggerExtentions.cs new file mode 100644 index 0000000..c6cc4df --- /dev/null +++ b/FastTunnel.Core/Extensions/LoggerExtentions.cs @@ -0,0 +1,19 @@ +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE +// Copyright (c) 2019 Gui.H + +using Microsoft.Extensions.Logging; +using System; + +namespace FastTunnel.Core.Extensions +{ + public static class LoggerExtentions + { + public static void LogError(this ILogger logger, Exception ex) + { + logger.LogError(ex, string.Empty); + } + } +} diff --git a/FastTunnel.Core/FastTunnelConst.cs b/FastTunnel.Core/FastTunnelConst.cs new file mode 100644 index 0000000..0210d76 --- /dev/null +++ b/FastTunnel.Core/FastTunnelConst.cs @@ -0,0 +1,21 @@ +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE +// Copyright (c) 2019 Gui.H + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FastTunnel.Core +{ + public class FastTunnelConst + { + public const string FASTTUNNEL_VERSION = "FT_VERSION"; + public const string FASTTUNNEL_MSGID = "FT_MSGID"; + public const string FASTTUNNEL_TOKEN = "FT_TOKEN"; + } +} diff --git a/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs b/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs index d73761f..1fc2f56 100644 --- a/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs +++ b/FastTunnel.Core/Handlers/Server/FastTunnelClientHandler.cs @@ -14,6 +14,7 @@ using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Protocol; using FastTunnel.Core.Server; using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace FastTunnel.Core.Handlers.Server; @@ -69,7 +70,9 @@ public class FastTunnelClientHandler return; } - var client = new TunnelClient(webSocket, fastTunnelServer, loginHandler, context.Connection.RemoteIpAddress); + var loggerFactory = context.RequestServices.GetRequiredService(); + var log = loggerFactory.CreateLogger(); + var client = new TunnelClient(webSocket, fastTunnelServer, loginHandler, context.Connection.RemoteIpAddress, log); client.ConnectionPort = context.Connection.LocalPort; try diff --git a/FastTunnel.Core/Handlers/Server/ILoginHandler.cs b/FastTunnel.Core/Handlers/Server/ILoginHandler.cs index 6c3a588..dbebd60 100644 --- a/FastTunnel.Core/Handlers/Server/ILoginHandler.cs +++ b/FastTunnel.Core/Handlers/Server/ILoginHandler.cs @@ -4,6 +4,7 @@ // https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE // Copyright (c) 2019 Gui.H +using System.Threading; using System.Threading.Tasks; using FastTunnel.Core.Models; using FastTunnel.Core.Server; @@ -12,5 +13,5 @@ namespace FastTunnel.Core.Handlers.Server; public interface ILoginHandler { - Task HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd); + Task HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken); } diff --git a/FastTunnel.Core/Handlers/Server/LoginHandler.cs b/FastTunnel.Core/Handlers/Server/LoginHandler.cs index bbe87a7..7b3db68 100644 --- a/FastTunnel.Core/Handlers/Server/LoginHandler.cs +++ b/FastTunnel.Core/Handlers/Server/LoginHandler.cs @@ -28,11 +28,11 @@ public class LoginHandler : ILoginHandler this.logger = logger; } - protected async Task HandleLoginAsync(FastTunnelServer server, TunnelClient client, LogInMassage requet) + protected async Task HandleLoginAsync(FastTunnelServer server, TunnelClient client, LogInMassage requet, CancellationToken cancellationToken) { var hasTunnel = false; - await client.webSocket.SendCmdAsync(MessageType.Log, $"穿透协议 | 映射关系(公网=>内网)", CancellationToken.None); + await client.webSocket.SendCmdAsync(MessageType.Log, $"穿透协议 | 映射关系(公网=>内网)", cancellationToken); Thread.Sleep(300); if (requet.Webs != null && requet.Webs.Any()) @@ -113,10 +113,10 @@ public class LoginHandler : ILoginHandler await client.webSocket.SendCmdAsync(MessageType.Log, TunnelResource.NoTunnel, CancellationToken.None); } - public virtual async Task HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd) + public virtual async Task HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken) { var msg = JsonSerializer.Deserialize(lineCmd); - await HandleLoginAsync(fastTunnelServer, tunnelClient, msg); + await HandleLoginAsync(fastTunnelServer, tunnelClient, msg, cancellationToken); return NeedRecive; } } diff --git a/FastTunnel.Core/Models/TunnelClient.cs b/FastTunnel.Core/Models/TunnelClient.cs index 894dd1e..418b102 100644 --- a/FastTunnel.Core/Models/TunnelClient.cs +++ b/FastTunnel.Core/Models/TunnelClient.cs @@ -5,14 +5,20 @@ // Copyright (c) 2019 Gui.H using System; +using System.Buffers; using System.Collections.Generic; +using System.IO.Pipelines; using System.Net; +using System.Net.Sockets; using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; +using FastTunnel.Core.Client; using FastTunnel.Core.Handlers.Server; -using FastTunnel.Core.Protocol; using FastTunnel.Core.Server; +using FastTunnel.Core.Utilitys; +using Microsoft.Extensions.Logging; namespace FastTunnel.Core.Models; @@ -27,14 +33,18 @@ public class TunnelClient private readonly FastTunnelServer fastTunnelServer; private readonly ILoginHandler loginHandler; + private readonly ILogger logger; public IPAddress RemoteIpAddress { get; private set; } private readonly IList webInfos = new List(); private readonly IList> forwardInfos = new List>(); - public TunnelClient(WebSocket webSocket, FastTunnelServer fastTunnelServer, ILoginHandler loginHandler, IPAddress remoteIpAddress) + public TunnelClient( + WebSocket webSocket, FastTunnelServer fastTunnelServer, + ILoginHandler loginHandler, IPAddress remoteIpAddress, ILogger logger) { + this.logger = logger; this.webSocket = webSocket; this.fastTunnelServer = fastTunnelServer; this.loginHandler = loginHandler; @@ -51,32 +61,29 @@ public class TunnelClient forwardInfos.Add(forwardInfo); } + /// + /// 接收客户端的消息 + /// + /// + /// public async Task ReviceAsync(CancellationToken cancellationToken) { - var buffer = new byte[ProtocolConst.MAX_CMD_LENGTH]; - var tunnelProtocol = new TunnelProtocol(); - - while (true) - { - var res = await webSocket.ReceiveAsync(buffer, cancellationToken); - var cmds = tunnelProtocol.HandleBuffer(buffer, 0, res.Count); - if (cmds == null) continue; - - foreach (var item in cmds) - { - if (!await HandleCmdAsync(this, item)) - { - return; - }; - } - } + var utility = new WebSocketUtility(webSocket, ProcessLine); + await utility.ProcessLinesAsync(cancellationToken); } - private async Task HandleCmdAsync(TunnelClient tunnelClient, string lineCmd) + + private async void ProcessLine(ReadOnlySequence line, CancellationToken cancellationToken) + { + var cmd = Encoding.UTF8.GetString(line); + await HandleCmdAsync(this, cmd, cancellationToken); + } + + private async Task HandleCmdAsync(TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken) { try { - return await loginHandler.HandlerMsg(fastTunnelServer, tunnelClient, lineCmd.Substring(1)); + return await loginHandler.HandlerMsg(fastTunnelServer, tunnelClient, lineCmd.Substring(1), cancellationToken); } catch (Exception ex) { diff --git a/FastTunnel.Core/Utilitys/WebSocketUtility.cs b/FastTunnel.Core/Utilitys/WebSocketUtility.cs new file mode 100644 index 0000000..89e599e --- /dev/null +++ b/FastTunnel.Core/Utilitys/WebSocketUtility.cs @@ -0,0 +1,132 @@ +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE +// Copyright (c) 2019 Gui.H + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace FastTunnel.Core.Utilitys; + +public class WebSocketUtility +{ + private readonly WebSocket webSocket; + + public WebSocketUtility(WebSocket webSocket, Action, CancellationToken> processLine) + { + this.webSocket = webSocket; + ProcessLine = processLine; + } + + public Action, CancellationToken> ProcessLine { get; } + + public async Task ProcessLinesAsync(CancellationToken cancellationToken) + { + var pipe = new Pipe(); + var writing = FillPipeAsync(webSocket, pipe.Writer, cancellationToken); + var reading = ReadPipeAsync(pipe.Reader, cancellationToken); + + await Task.WhenAll(reading, writing); + } + + /// + /// 读取socket收到的消息写入Pipe + /// + /// + /// + /// + /// + /// + private async Task FillPipeAsync(WebSocket socket, PipeWriter writer, CancellationToken cancellationToken) + { + const int minimumBufferSize = 512; + + while (true) + { + // Allocate at least 512 bytes from the PipeWriter. + var memory = writer.GetMemory(minimumBufferSize); + + try + { + var bytesRead = await socket.ReceiveAsync(memory, cancellationToken); + if (bytesRead.Count == 0 || bytesRead.EndOfMessage || bytesRead.MessageType == WebSocketMessageType.Close) + { + break; + } + // Tell the PipeWriter how much was read from the Socket. + writer.Advance(bytesRead.Count); + } + catch (Exception) + { + break; + } + + // Make the data available to the PipeReader. + var result = await writer.FlushAsync(cancellationToken); + + if (result.IsCompleted) + { + break; + } + } + + // By completing PipeWriter, tell the PipeReader that there's no more data coming. + await writer.CompleteAsync(); + } + + /// + /// 从Pipe中读取收到的消息 + /// + /// + /// + /// + private async Task ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken) + { + while (true) + { + var result = await reader.ReadAsync(cancellationToken); + var buffer = result.Buffer; + + while (TryReadLine(ref buffer, out ReadOnlySequence line)) + { + // Process the line. + ProcessLine(line, cancellationToken); + } + + // Tell the PipeReader how much of the buffer has been consumed. + reader.AdvanceTo(buffer.Start, buffer.End); + + // Stop reading if there's no more data coming. + if (result.IsCompleted) + { + break; + } + } + + // Mark the PipeReader as complete. + await reader.CompleteAsync(); + } + + bool TryReadLine(ref ReadOnlySequence buffer, out ReadOnlySequence line) + { + // Look for a EOL in the buffer. + SequencePosition? position = buffer.PositionOf((byte)'\n'); + + if (position == null) + { + line = default; + return false; + } + + // Skip the line + the \n. + line = buffer.Slice(0, position.Value); + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); + return true; + } +}