提取clienttunnel对象

This commit is contained in:
springhgui 2021-11-13 22:42:13 +08:00
parent 8357d0bb99
commit 4c11c0b321
15 changed files with 212 additions and 58 deletions

View File

@ -53,7 +53,9 @@
// [] windows3389
"LocalPort": 3389,
// [] 访 ip:1274 window
"RemotePort": 1274
"RemotePort": 1274,
// [] TCP,UDP,TCP
"Protocol": "TCP"
},
{
"LocalIp": "127.0.0.1",

View File

@ -12,27 +12,58 @@ using Microsoft.Extensions.Options;
using System.IO;
using Yarp.Sample;
using Yarp.ReverseProxy.Configuration;
using System.Collections.Generic;
namespace FastTunnel.Core.Client
{
public class FastTunnelServer
{
public int ConnectedClientCount = 0;
public readonly IOptionsMonitor<DefaultServerConfig> ServerOption;
public IProxyConfigProvider proxyConfig;
ILogger<FastTunnelServer> logger;
public ConcurrentDictionary<string, TaskCompletionSource<Stream>> ResponseTasks { get; } = new();
public ConcurrentDictionary<string, WebInfo> WebList { get; private set; } = new();
public int ConnectedClientCount = 0;
public ConcurrentDictionary<int, ForwardInfo<ForwardHandlerArg>> ForwardList { get; private set; }
= new ConcurrentDictionary<int, ForwardInfo<ForwardHandlerArg>>();
public readonly IOptionsMonitor<DefaultServerConfig> ServerOption;
public IProxyConfigProvider proxyConfig;
/// <summary>
/// 在线客户端列表
/// </summary>
public IList<TunnelClient> Clients = new List<TunnelClient>();
public FastTunnelServer(IProxyConfigProvider proxyConfig, IOptionsMonitor<DefaultServerConfig> serverSettings)
public FastTunnelServer(ILogger<FastTunnelServer> logger, IProxyConfigProvider proxyConfig, IOptionsMonitor<DefaultServerConfig> serverSettings)
{
ServerOption = serverSettings;
this.logger = logger;
this.ServerOption = serverSettings;
this.proxyConfig = proxyConfig;
}
/// <summary>
/// 客户端登录
/// </summary>
/// <param name="client"></param>
internal void OnClientLogin(TunnelClient client)
{
Interlocked.Increment(ref ConnectedClientCount);
logger.LogInformation($"客户端连接 {client.RemoteIpAddress} 当前在线数:{ConnectedClientCount}");
Clients.Add(client);
}
/// <summary>
/// 客户端退出
/// </summary>
/// <param name="client"></param>
/// <exception cref="NotImplementedException"></exception>
internal void OnClientLogout(TunnelClient client)
{
Interlocked.Decrement(ref ConnectedClientCount);
logger.LogInformation($"客户端关闭 {client.RemoteIpAddress} 当前在线数:{ConnectedClientCount}");
Clients.Remove(client);
client.Logout();
}
}
}

View File

@ -13,8 +13,11 @@ namespace FastTunnel.Core.Config
public bool EnableForward { get; set; } = false;
[Obsolete("由Tokens替换")]
public string Token { get; set; }
public string[] Tokens { get; set; }
public ApiOptions Api { get; set; }
public class ApiOptions

View File

@ -56,7 +56,6 @@ namespace FastTunnel.Core
services.Configure<DefaultServerConfig>(configurationSection)
.AddSingleton<IExceptionFilter, FastTunnelExceptionFilter>()
.AddTransient<ILoginHandler, LoginHandler>()
.AddSingleton<TunnelClientHandler>()
.AddSingleton<FastTunnelClientHandler>()
.AddSingleton<FastTunnelSwapHandler>()
.AddSingleton<FastTunnelServer>();

View File

@ -9,7 +9,7 @@ namespace FastTunnel.Core.Extensions
{
public static class TaskCompletionSourceExtensions
{
public static void SetTimeOut<T>(this TaskCompletionSource<T> tcs, int timeoutMs, Action action)
public static void SetTimeOut<T>(this TaskCompletionSource<T> tcs, int timeoutMs, Action? action)
{
var ct = new CancellationTokenSource(timeoutMs);
ct.Token.Register(() =>

View File

@ -64,10 +64,7 @@ namespace FastTunnel.Core.Forwarder
TaskCompletionSource<Stream> tcs = new(cancellation);
_logger.LogDebug($"[Http]Swap开始 {msgId}|{host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
tcs.SetTimeOut(20000, () =>
{
_logger.LogError($"[Http]建立Swap超时 {msgId}");
});
tcs.SetTimeOut(20000, null);
_fastTunnelServer.ResponseTasks.TryAdd(msgId, tcs);

View File

@ -2,6 +2,7 @@
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder;
using FastTunnel.Core.Forwarder.MiddleWare;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Models;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Connections.Features;
@ -25,13 +26,14 @@ namespace FastTunnel.Core.MiddleWares
ILogger<FastTunnelClientHandler> logger;
FastTunnelServer fastTunnelServer;
Version serverVersion;
TunnelClientHandler tunnelClient;
ILoginHandler loginHandler;
public FastTunnelClientHandler(ILogger<FastTunnelClientHandler> logger, FastTunnelServer fastTunnelServer, TunnelClientHandler tunnelClient)
public FastTunnelClientHandler(
ILogger<FastTunnelClientHandler> logger, FastTunnelServer fastTunnelServer, ILoginHandler loginHandler)
{
this.logger = logger;
this.fastTunnelServer = fastTunnelServer;
this.tunnelClient = tunnelClient;
this.loginHandler = loginHandler;
serverVersion = System.Reflection.Assembly.GetExecutingAssembly().GetName().Version;
}
@ -46,7 +48,7 @@ namespace FastTunnel.Core.MiddleWares
return;
};
await handleClient(context, next, version);
await handleClient(context, version);
}
catch (Exception ex)
{
@ -54,7 +56,7 @@ namespace FastTunnel.Core.MiddleWares
}
}
private async Task handleClient(HttpContext context, Func<Task> next, string clientVersion)
private async Task handleClient(HttpContext context, string clientVersion)
{
using var webSocket = await context.WebSockets.AcceptWebSocketAsync();
@ -70,26 +72,21 @@ namespace FastTunnel.Core.MiddleWares
return;
}
var client = new TunnelClient(webSocket, fastTunnelServer, loginHandler, context.Connection.RemoteIpAddress);
try
{
Interlocked.Increment(ref fastTunnelServer.ConnectedClientCount);
logger.LogInformation($"客户端连接 {context.TraceIdentifier}:{context.Connection.RemoteIpAddress} 当前在线数:{fastTunnelServer.ConnectedClientCount}");
await tunnelClient.ReviceAsync(webSocket, CancellationToken.None);
fastTunnelServer.OnClientLogin(client);
await client.ReviceAsync(CancellationToken.None);
logOut(context);
fastTunnelServer.OnClientLogout(client);
}
catch (Exception)
{
logOut(context);
fastTunnelServer.OnClientLogout(client);
}
}
private void logOut(HttpContext context)
{
Interlocked.Decrement(ref fastTunnelServer.ConnectedClientCount);
logger.LogInformation($"客户端关闭 {context.TraceIdentifier}:{context.Connection.RemoteIpAddress} 当前在线数:{fastTunnelServer.ConnectedClientCount}");
}
private static async Task Close(WebSocket webSocket, string reason)
{
await webSocket.SendCmdAsync(MessageType.Log, reason, CancellationToken.None);
@ -99,17 +96,24 @@ namespace FastTunnel.Core.MiddleWares
private bool checkToken(HttpContext context)
{
if (string.IsNullOrEmpty(fastTunnelServer.ServerOption.CurrentValue.Token))
if (string.IsNullOrEmpty(fastTunnelServer.ServerOption.CurrentValue.Token)
&& (fastTunnelServer.ServerOption.CurrentValue.Tokens == null) || fastTunnelServer.ServerOption.CurrentValue.Tokens.Count() == 0)
{
return true;
}
if (!context.Request.Headers.TryGetValue(FastTunnelConst.FASTTUNNEL_TOKEN, out var token) || !token.Equals(fastTunnelServer.ServerOption.CurrentValue.Token))
// 客户端未携带token登录失败
if (!context.Request.Headers.TryGetValue(FastTunnelConst.FASTTUNNEL_TOKEN, out var token))
{
return false;
}
if (token.Equals(fastTunnelServer.ServerOption.CurrentValue.Token))
{
return true;
};
return true;
return fastTunnelServer.ServerOption.CurrentValue.Tokens?.Contains<string>(token) ?? false;
}
}
}

View File

@ -47,7 +47,6 @@ namespace FastTunnel.Core.Forwarder.MiddleWare
if (lifetime == null || transport == null)
{
await next();
return;
}

View File

@ -0,0 +1,107 @@
using Microsoft.AspNetCore.Http;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Forwarder
{
public class TranStream : Stream
{
private readonly Stream readStream;
private readonly Stream wirteStream;
public TranStream(HttpContext context)
{
this.readStream = context.Request.BodyReader.AsStream();
this.wirteStream = context.Response.BodyWriter.AsStream();
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Flush()
{
this.wirteStream.Flush();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return this.wirteStream.FlushAsync(cancellationToken);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return this.readStream.Read(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count)
{
this.wirteStream.Write(buffer, offset, count);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return this.readStream.ReadAsync(buffer, cancellationToken);
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var len = await this.readStream.ReadAsync(buffer, offset, count, cancellationToken);
if (len == 0) { Console.WriteLine("==========ReadAsync END=========="); }
return len;
}
public override void Write(ReadOnlySpan<byte> buffer)
{
this.wirteStream.Write(buffer);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return this.wirteStream.WriteAsync(buffer, offset, count, cancellationToken);
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await this.wirteStream.WriteAsync(buffer, cancellationToken);
}
protected override void Dispose(bool disposing)
{
Console.WriteLine("========Dispose=========");
}
public override ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
public override void Close()
{
Console.WriteLine("========Close=========");
base.Close();
}
}
}

View File

@ -55,13 +55,13 @@ namespace FastTunnel.Core.Handlers.Client
private async Task<Stream> createLocal(string requestId, string localhost, CancellationToken cancellationToken)
{
var socket = await DnsSocketFactory.ConnectAsync(localhost.Split(":")[0], int.Parse(localhost.Split(":")[1]));
return new NetworkStream(socket, true) { ReadTimeout = 1000 * 60 * 30 };
return new NetworkStream(socket, true) { ReadTimeout = 1000 * 60 * 10 };
}
private async Task<Stream> createRemote(string requestId, FastTunnelClient cleint, CancellationToken cancellationToken)
{
var socket = await DnsSocketFactory.ConnectAsync(cleint.Server.ServerAddr, cleint.Server.ServerPort);
Stream serverStream = new NetworkStream(socket, true) { ReadTimeout = 1000 * 60 * 30 };
Stream serverStream = new NetworkStream(socket, true) { ReadTimeout = 1000 * 60 * 10 };
if (cleint.Server.Protocol == "wss")
{

View File

@ -40,10 +40,7 @@ namespace FastTunnel.Core.Dispatchers
logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
var tcs = new TaskCompletionSource<Stream>();
tcs.SetTimeOut(20000, () =>
{
logger.LogError($"[Forward]建立Swap超时 {msgId}");
});
tcs.SetTimeOut(20000, null);
_server.ResponseTasks.TryAdd(msgId, tcs);
@ -69,7 +66,7 @@ namespace FastTunnel.Core.Dispatchers
}
using var stream1 = await tcs.Task;
using var stream2 = new NetworkStream(_socket, true) { ReadTimeout = 1000 * 60 * 30 };
using var stream2 = new NetworkStream(_socket, true) { ReadTimeout = 1000 * 60 * 10 };
await Task.WhenAll(stream1.CopyToAsync(stream2), stream2.CopyToAsync(stream1));

View File

@ -20,5 +20,17 @@ namespace FastTunnel.Core.Models
/// 服务端监听的端口号 1~65535
/// </summary>
public int RemotePort { get; set; }
/// <summary>
/// 协议,内网服务监听的协议
/// </summary>
public ProtocolEnum Protocol { get; set; }
}
public enum ProtocolEnum
{
TCP = 0,
UDP = 1,
}
}

View File

@ -1,36 +1,34 @@
using FastTunnel.Core.Client;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Protocol;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Yarp.ReverseProxy.Configuration;
namespace FastTunnel.Core.Models
{
public class TunnelClientHandler
public class TunnelClient
{
readonly ILoginHandler _loginHandler;
FastTunnelServer fastTunnelServer;
ILogger logger;
readonly WebSocket webSocket;
readonly FastTunnelServer fastTunnelServer;
readonly ILoginHandler loginHandler;
public TunnelClientHandler(ILogger<TunnelClientHandler> logger, FastTunnelServer fastTunnelServer, ILoginHandler loginHandler)
public IPAddress RemoteIpAddress { get; private set; }
public TunnelClient(WebSocket webSocket, FastTunnelServer fastTunnelServer, ILoginHandler loginHandler, IPAddress remoteIpAddress)
{
this.logger = logger;
this.webSocket = webSocket;
this.fastTunnelServer = fastTunnelServer;
this._loginHandler = loginHandler;
this.loginHandler = loginHandler;
this.RemoteIpAddress = remoteIpAddress;
}
public async Task ReviceAsync(WebSocket webSocket, CancellationToken cancellationToken)
public async Task ReviceAsync(CancellationToken cancellationToken)
{
var buffer = new byte[FastTunnelConst.MAX_CMD_LENGTH];
var tunnelProtocol = new TunnelProtocol();
@ -55,14 +53,17 @@ namespace FastTunnel.Core.Models
{
try
{
logger.LogDebug($"client{lineCmd}");
return await _loginHandler.HandlerMsg(fastTunnelServer, webSocket, lineCmd.Substring(1));
return await loginHandler.HandlerMsg(fastTunnelServer, webSocket, lineCmd.Substring(1));
}
catch (Exception ex)
{
logger.LogError(ex, $"处理客户端消息失败cmd={lineCmd}");
Console.WriteLine($"处理客户端消息失败cmd={lineCmd}");
return false;
}
}
internal void Logout()
{
}
}
}

View File

@ -2,6 +2,7 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DefineConstants>DEBUG</DefineConstants>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>

View File

@ -23,8 +23,9 @@
// Forward.false
"EnableForward": true,
// Token
// TokenTokenstoken
"Token": "TOKEN_FOR_CLIENT_AUTHENTICATION",
"Tokens": [ "TOKEN1", "TOKEN2", "TOKEN2" ],
/**
* 访apiJWT