use Pipelines io

This commit is contained in:
Gui.H 2022-07-14 18:43:09 +08:00
parent 75075cc0a0
commit 4af0ee0f23
10 changed files with 269 additions and 80 deletions

View File

@ -5,6 +5,8 @@
// Copyright (c) 2019 Gui.H // Copyright (c) 2019 Gui.H
using System; using System;
using System.Buffers;
using System.Net.Sockets;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
@ -12,26 +14,26 @@ using System.Threading.Tasks;
using FastTunnel.Core.Config; using FastTunnel.Core.Config;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers.Client; using FastTunnel.Core.Handlers.Client;
using FastTunnel.Core.Models;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol;
using FastTunnel.Core.Utilitys; using FastTunnel.Core.Utilitys;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace FastTunnel.Core.Client; namespace FastTunnel.Core.Client;
/// <summary>
/// 客户端
/// </summary>
public class FastTunnelClient : IFastTunnelClient public class FastTunnelClient : IFastTunnelClient
{ {
private ClientWebSocket socket; private ClientWebSocket socket;
protected readonly ILogger<FastTunnelClient> _logger; protected readonly ILogger<FastTunnelClient> _logger;
private readonly SwapHandler _newCustomerHandler; protected DefaultClientConfig ClientConfig { get; private set; }
private readonly LogHandler _logHandler;
public DefaultClientConfig ClientConfig { get; private set; } private readonly SwapHandler swapHandler;
private readonly LogHandler logHandler;
private static ReadOnlySpan<byte> EndSpan => new ReadOnlySpan<byte>(new byte[] { (byte)'\n' });
public SuiDaoServer Server { get; protected set; } public SuiDaoServer Server { get; protected set; }
@ -41,9 +43,10 @@ public class FastTunnelClient : IFastTunnelClient
LogHandler logHandler, LogHandler logHandler,
IOptionsMonitor<DefaultClientConfig> configuration) IOptionsMonitor<DefaultClientConfig> configuration)
{ {
ReadOnlySpan<int> span = new ReadOnlySpan<int>();
_logger = logger; _logger = logger;
_newCustomerHandler = newCustomerHandler; swapHandler = newCustomerHandler;
_logHandler = logHandler; this.logHandler = logHandler;
ClientConfig = configuration.CurrentValue; ClientConfig = configuration.CurrentValue;
Server = ClientConfig.Server; Server = ClientConfig.Server;
} }
@ -51,8 +54,7 @@ public class FastTunnelClient : IFastTunnelClient
/// <summary> /// <summary>
/// 启动客户端 /// 启动客户端
/// </summary> /// </summary>
/// <param name="cancellationToken"></param> public virtual async Task StartAsync(CancellationToken cancellationToken)
public async void StartAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("===== FastTunnel Client Start ====="); _logger.LogInformation("===== FastTunnel Client Start =====");
@ -75,32 +77,29 @@ public class FastTunnelClient : IFastTunnelClient
private async Task loginAsync(CancellationToken cancellationToken) private async Task loginAsync(CancellationToken cancellationToken)
{ {
try var logMsg = GetLoginMsg(cancellationToken);
if (socket != null)
{ {
var logMsg = GetLoginMsg(cancellationToken); socket.Abort();
// 连接到的目标IP
socket = new ClientWebSocket();
socket.Options.RemoteCertificateValidationCallback = delegate { return true; };
socket.Options.SetRequestHeader(ProtocolConst.FASTTUNNEL_VERSION, AssemblyUtility.GetVersion().ToString());
socket.Options.SetRequestHeader(ProtocolConst.FASTTUNNEL_TOKEN, ClientConfig.Token);
_logger.LogInformation($"正在连接服务端 {Server.ServerAddr}:{Server.ServerPort}");
await socket.ConnectAsync(
new Uri($"{Server.Protocol}://{Server.ServerAddr}:{Server.ServerPort}"), cancellationToken);
_logger.LogDebug("连接服务端成功");
// 登录
await socket.SendCmdAsync(MessageType.LogIn, logMsg, cancellationToken);
}
catch (Exception)
{
throw;
} }
// 连接到的目标IP
socket = new ClientWebSocket();
socket.Options.RemoteCertificateValidationCallback = delegate { return true; };
socket.Options.SetRequestHeader(FastTunnelConst.FASTTUNNEL_VERSION, AssemblyUtility.GetVersion().ToString());
socket.Options.SetRequestHeader(FastTunnelConst.FASTTUNNEL_TOKEN, ClientConfig.Token);
_logger.LogInformation($"正在连接服务端 {Server.ServerAddr}:{Server.ServerPort}");
await socket.ConnectAsync(
new Uri($"{Server.Protocol}://{Server.ServerAddr}:{Server.ServerPort}"), cancellationToken);
_logger.LogDebug("连接服务端成功");
// 登录
await socket.SendCmdAsync(MessageType.LogIn, logMsg, cancellationToken);
} }
public virtual string GetLoginMsg(CancellationToken cancellationToken) protected virtual string GetLoginMsg(CancellationToken cancellationToken)
{ {
Server = ClientConfig.Server; Server = ClientConfig.Server;
return new LogInMassage return new LogInMassage
@ -110,46 +109,53 @@ public class FastTunnelClient : IFastTunnelClient
}.ToJson(); }.ToJson();
} }
private async Task ReceiveServerAsync(CancellationToken cancellationToken)
protected async Task ReceiveServerAsync(CancellationToken cancellationToken)
{ {
var buffer = new byte[ProtocolConst.MAX_CMD_LENGTH]; var utility = new WebSocketUtility(socket, ProcessLine);
while (!cancellationToken.IsCancellationRequested) await utility.ProcessLinesAsync(cancellationToken);
{
var res = await socket.ReceiveAsync(buffer, cancellationToken);
var type = buffer[0];
var content = Encoding.UTF8.GetString(buffer, 1, res.Count - 1);
HandleServerRequestAsync(type, content, cancellationToken);
}
} }
private async void HandleServerRequestAsync(byte cmd, string ctx, CancellationToken cancellationToken) private void ProcessLine(ReadOnlySequence<byte> line, CancellationToken cancellationToken)
{
HandleServerRequestAsync(line, cancellationToken);
}
private void HandleServerRequestAsync(ReadOnlySequence<byte> line, CancellationToken cancellationToken)
{ {
try try
{ {
var row = line.ToArray();
var cmd = row[0];
IClientHandler handler; IClientHandler handler;
switch ((MessageType)cmd) switch ((MessageType)cmd)
{ {
case MessageType.SwapMsg: case MessageType.SwapMsg:
case MessageType.Forward: case MessageType.Forward:
handler = _newCustomerHandler; handler = swapHandler;
break; break;
case MessageType.Log: case MessageType.Log:
handler = _logHandler; handler = logHandler;
break; break;
default: default:
throw new Exception($"未处理的消息cmd={cmd}"); throw new Exception($"未处理的消息cmd={cmd}");
} }
await handler.HandlerMsgAsync(this, ctx, cancellationToken); var content = Encoding.UTF8.GetString(line.Slice(1));
handler.HandlerMsgAsync(this, content, cancellationToken);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "HandleServerRequest Error"); _logger.LogError(ex);
} }
} }
public void Stop(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("===== FastTunnel Client Stoping ====="); _logger.LogInformation("===== FastTunnel Client Stoping =====");
if (socket != null)
{
socket.Abort();
}
} }
} }

View File

@ -5,12 +5,13 @@
// Copyright (c) 2019 Gui.H // Copyright (c) 2019 Gui.H
using System.Threading; using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Client; namespace FastTunnel.Core.Client;
public interface IFastTunnelClient public interface IFastTunnelClient
{ {
void StartAsync(CancellationToken cancellationToken); Task StartAsync(CancellationToken cancellationToken);
void Stop(CancellationToken cancellationToken); Task StopAsync(CancellationToken cancellationToken);
} }

View File

@ -31,10 +31,9 @@ public class ServiceFastTunnelClient : IHostedService
await Task.CompletedTask; await Task.CompletedTask;
} }
public Task StopAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken)
{ {
_fastTunnelClient.Stop(cancellationToken); await _fastTunnelClient.StopAsync(cancellationToken);
return Task.CompletedTask;
} }
private void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) private void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)

View File

@ -0,0 +1,19 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using Microsoft.Extensions.Logging;
using System;
namespace FastTunnel.Core.Extensions
{
public static class LoggerExtentions
{
public static void LogError(this ILogger logger, Exception ex)
{
logger.LogError(ex, string.Empty);
}
}
}

View File

@ -0,0 +1,21 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core
{
public class FastTunnelConst
{
public const string FASTTUNNEL_VERSION = "FT_VERSION";
public const string FASTTUNNEL_MSGID = "FT_MSGID";
public const string FASTTUNNEL_TOKEN = "FT_TOKEN";
}
}

View File

@ -14,6 +14,7 @@ using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Handlers.Server; namespace FastTunnel.Core.Handlers.Server;
@ -69,7 +70,9 @@ public class FastTunnelClientHandler
return; return;
} }
var client = new TunnelClient(webSocket, fastTunnelServer, loginHandler, context.Connection.RemoteIpAddress); var loggerFactory = context.RequestServices.GetRequiredService<ILoggerFactory>();
var log = loggerFactory.CreateLogger<TunnelClient>();
var client = new TunnelClient(webSocket, fastTunnelServer, loginHandler, context.Connection.RemoteIpAddress, log);
client.ConnectionPort = context.Connection.LocalPort; client.ConnectionPort = context.Connection.LocalPort;
try try

View File

@ -4,6 +4,7 @@
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE // https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H // Copyright (c) 2019 Gui.H
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
@ -12,5 +13,5 @@ namespace FastTunnel.Core.Handlers.Server;
public interface ILoginHandler public interface ILoginHandler
{ {
Task<bool> HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd); Task<bool> HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken);
} }

View File

@ -28,11 +28,11 @@ public class LoginHandler : ILoginHandler
this.logger = logger; this.logger = logger;
} }
protected async Task HandleLoginAsync(FastTunnelServer server, TunnelClient client, LogInMassage requet) protected async Task HandleLoginAsync(FastTunnelServer server, TunnelClient client, LogInMassage requet, CancellationToken cancellationToken)
{ {
var hasTunnel = false; var hasTunnel = false;
await client.webSocket.SendCmdAsync(MessageType.Log, $"穿透协议 | 映射关系(公网=>内网)", CancellationToken.None); await client.webSocket.SendCmdAsync(MessageType.Log, $"穿透协议 | 映射关系(公网=>内网)", cancellationToken);
Thread.Sleep(300); Thread.Sleep(300);
if (requet.Webs != null && requet.Webs.Any()) if (requet.Webs != null && requet.Webs.Any())
@ -113,10 +113,10 @@ public class LoginHandler : ILoginHandler
await client.webSocket.SendCmdAsync(MessageType.Log, TunnelResource.NoTunnel, CancellationToken.None); await client.webSocket.SendCmdAsync(MessageType.Log, TunnelResource.NoTunnel, CancellationToken.None);
} }
public virtual async Task<bool> HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd) public virtual async Task<bool> HandlerMsg(FastTunnelServer fastTunnelServer, TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken)
{ {
var msg = JsonSerializer.Deserialize<LogInMassage>(lineCmd); var msg = JsonSerializer.Deserialize<LogInMassage>(lineCmd);
await HandleLoginAsync(fastTunnelServer, tunnelClient, msg); await HandleLoginAsync(fastTunnelServer, tunnelClient, msg, cancellationToken);
return NeedRecive; return NeedRecive;
} }
} }

View File

@ -5,14 +5,20 @@
// Copyright (c) 2019 Gui.H // Copyright (c) 2019 Gui.H
using System; using System;
using System.Buffers;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net; using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Client;
using FastTunnel.Core.Handlers.Server; using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
using FastTunnel.Core.Utilitys;
using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Models; namespace FastTunnel.Core.Models;
@ -27,14 +33,18 @@ public class TunnelClient
private readonly FastTunnelServer fastTunnelServer; private readonly FastTunnelServer fastTunnelServer;
private readonly ILoginHandler loginHandler; private readonly ILoginHandler loginHandler;
private readonly ILogger<TunnelClient> logger;
public IPAddress RemoteIpAddress { get; private set; } public IPAddress RemoteIpAddress { get; private set; }
private readonly IList<WebInfo> webInfos = new List<WebInfo>(); private readonly IList<WebInfo> webInfos = new List<WebInfo>();
private readonly IList<ForwardInfo<ForwardHandlerArg>> forwardInfos = new List<ForwardInfo<ForwardHandlerArg>>(); private readonly IList<ForwardInfo<ForwardHandlerArg>> forwardInfos = new List<ForwardInfo<ForwardHandlerArg>>();
public TunnelClient(WebSocket webSocket, FastTunnelServer fastTunnelServer, ILoginHandler loginHandler, IPAddress remoteIpAddress) public TunnelClient(
WebSocket webSocket, FastTunnelServer fastTunnelServer,
ILoginHandler loginHandler, IPAddress remoteIpAddress, ILogger<TunnelClient> logger)
{ {
this.logger = logger;
this.webSocket = webSocket; this.webSocket = webSocket;
this.fastTunnelServer = fastTunnelServer; this.fastTunnelServer = fastTunnelServer;
this.loginHandler = loginHandler; this.loginHandler = loginHandler;
@ -51,32 +61,29 @@ public class TunnelClient
forwardInfos.Add(forwardInfo); forwardInfos.Add(forwardInfo);
} }
/// <summary>
/// 接收客户端的消息
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ReviceAsync(CancellationToken cancellationToken) public async Task ReviceAsync(CancellationToken cancellationToken)
{ {
var buffer = new byte[ProtocolConst.MAX_CMD_LENGTH]; var utility = new WebSocketUtility(webSocket, ProcessLine);
var tunnelProtocol = new TunnelProtocol(); await utility.ProcessLinesAsync(cancellationToken);
while (true)
{
var res = await webSocket.ReceiveAsync(buffer, cancellationToken);
var cmds = tunnelProtocol.HandleBuffer(buffer, 0, res.Count);
if (cmds == null) continue;
foreach (var item in cmds)
{
if (!await HandleCmdAsync(this, item))
{
return;
};
}
}
} }
private async Task<bool> HandleCmdAsync(TunnelClient tunnelClient, string lineCmd)
private async void ProcessLine(ReadOnlySequence<byte> line, CancellationToken cancellationToken)
{
var cmd = Encoding.UTF8.GetString(line);
await HandleCmdAsync(this, cmd, cancellationToken);
}
private async Task<bool> HandleCmdAsync(TunnelClient tunnelClient, string lineCmd, CancellationToken cancellationToken)
{ {
try try
{ {
return await loginHandler.HandlerMsg(fastTunnelServer, tunnelClient, lineCmd.Substring(1)); return await loginHandler.HandlerMsg(fastTunnelServer, tunnelClient, lineCmd.Substring(1), cancellationToken);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -0,0 +1,132 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Utilitys;
public class WebSocketUtility
{
private readonly WebSocket webSocket;
public WebSocketUtility(WebSocket webSocket, Action<ReadOnlySequence<byte>, CancellationToken> processLine)
{
this.webSocket = webSocket;
ProcessLine = processLine;
}
public Action<ReadOnlySequence<byte>, CancellationToken> ProcessLine { get; }
public async Task ProcessLinesAsync(CancellationToken cancellationToken)
{
var pipe = new Pipe();
var writing = FillPipeAsync(webSocket, pipe.Writer, cancellationToken);
var reading = ReadPipeAsync(pipe.Reader, cancellationToken);
await Task.WhenAll(reading, writing);
}
/// <summary>
/// 读取socket收到的消息写入Pipe
/// </summary>
/// <param name="socket"></param>
/// <param name="writer"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private async Task FillPipeAsync(WebSocket socket, PipeWriter writer, CancellationToken cancellationToken)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
var memory = writer.GetMemory(minimumBufferSize);
try
{
var bytesRead = await socket.ReceiveAsync(memory, cancellationToken);
if (bytesRead.Count == 0 || bytesRead.EndOfMessage || bytesRead.MessageType == WebSocketMessageType.Close)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead.Count);
}
catch (Exception)
{
break;
}
// Make the data available to the PipeReader.
var result = await writer.FlushAsync(cancellationToken);
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
/// <summary>
/// 从Pipe中读取收到的消息
/// </summary>
/// <param name="reader"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private async Task ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken)
{
while (true)
{
var result = await reader.ReadAsync(cancellationToken);
var buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line, cancellationToken);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}