This commit is contained in:
Gui.H 2022-07-02 00:28:21 +08:00
parent 6233584868
commit c3a8378f75
8 changed files with 65 additions and 38 deletions

View File

@ -49,7 +49,6 @@ class Program
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.UseWindowsService() .UseWindowsService()
.UseSerilog((context, services, configuration) => configuration .UseSerilog((context, services, configuration) => configuration
.MinimumLevel.Debug()
.WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day)
.WriteTo.Console()) .WriteTo.Console())
.ConfigureServices((hostContext, services) => .ConfigureServices((hostContext, services) =>

View File

@ -10,11 +10,13 @@ using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Exceptions; using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.Kestrel; using FastTunnel.Core.Forwarder.Kestrel;
using FastTunnel.Core.Forwarder.Kestrel.Features; using FastTunnel.Core.Forwarder.Kestrel.Features;
using FastTunnel.Core.Forwarder.Streams;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
@ -43,19 +45,15 @@ internal class ForwarderMiddleware
internal async Task OnConnectionAsync(ConnectionContext context) internal async Task OnConnectionAsync(ConnectionContext context)
{ {
logger.LogInformation("=========ForwarderMiddleware SART===========");
var feat = context.Features.Get<IFastTunnelFeature>(); var feat = context.Features.Get<IFastTunnelFeature>();
if (feat == null) if (feat == null)
{ {
logger.LogInformation("=========ForwarderMiddleware END===========");
// not fasttunnel request // not fasttunnel request
await next(context); await next(context);
return; return;
} }
else else
{ {
logger.LogInformation("=========Swap STRART===========");
if (feat.Method == ProtocolConst.HTTP_METHOD_SWAP) if (feat.Method == ProtocolConst.HTTP_METHOD_SWAP)
{ {
await doSwap(context); await doSwap(context);
@ -68,25 +66,32 @@ internal class ForwarderMiddleware
{ {
throw new NotSupportedException(); throw new NotSupportedException();
} }
logger.LogInformation("=========Swap END===========");
logger.LogInformation("=========ForwarderMiddleware END===========");
} }
} }
/// <summary>
/// 用户发起的请求
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private async Task waitSwap(ConnectionContext context) private async Task waitSwap(ConnectionContext context)
{ {
var feat = context.Features.Get<IFastTunnelFeature>(); var feat = context.Features.Get<IFastTunnelFeature>();
var requestId = Guid.NewGuid().ToString().Replace("-", ""); var requestId = Guid.NewGuid().ToString().Replace("-", "");
logger.LogInformation($"=========USER START {requestId}===========");
var web = feat.MatchWeb; var web = feat.MatchWeb;
TaskCompletionSource<IDuplexPipe> tcs = new(); TaskCompletionSource<(Stream, CancellationTokenSource)> tcs = new();
logger.LogDebug($"[Http]Swap开始 {requestId}|{feat.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}"); logger.LogDebug($"[Http]Swap开始 {requestId}|{feat.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); });
fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs); if (!fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs))
{
return;
}
IDuplexPipe res = null; //IDuplexPipe res = null;
(Stream Stream, CancellationTokenSource TokenSource) res = (null, null);
try try
{ {
@ -102,14 +107,19 @@ internal class ForwarderMiddleware
// 通讯异常,返回客户端离线 // 通讯异常,返回客户端离线
throw new ClienOffLineException("客户端离线"); throw new ClienOffLineException("客户端离线");
} }
res = await tcs.Task; res = await tcs.Task;
// using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(res.TokenSource.Token, context.ConnectionClosed);
var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed); using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
var t2 = context.Transport.Input.CopyToAsync(res.Output, context.ConnectionClosed);
await Task.WhenAny(t1, t2); //var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed);
//var t2 = context.Transport.Input.CopyToAsync(res.Output, context.ConnectionClosed);
var t1 = res.Stream.CopyToAsync(reverseConnection, tokenSource.Token);
var t2 = reverseConnection.CopyToAsync(res.Stream, tokenSource.Token);
await Task.WhenAny(t1, t2).WaitAsync(tokenSource.Token);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -117,34 +127,49 @@ internal class ForwarderMiddleware
} }
finally finally
{ {
logger.LogInformation("[Http] waitSwap结束"); logger.LogInformation($"=========USER END {requestId}===========");
fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); fastTunnelServer.ResponseTasks.TryRemove(requestId, out _);
await context.Transport.Input.CompleteAsync(); await context.Transport.Input.CompleteAsync();
await context.Transport.Output.CompleteAsync(); await context.Transport.Output.CompleteAsync();
await res.Input.CompleteAsync(); res.TokenSource?.Cancel();
await res.Output.CompleteAsync();
} }
} }
/// <summary>
/// 内网发起的请求
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
private async Task doSwap(ConnectionContext context) private async Task doSwap(ConnectionContext context)
{ {
var feat = context.Features.Get<IFastTunnelFeature>(); var feat = context.Features.Get<IFastTunnelFeature>();
var requestId = feat.MessageId; var requestId = feat.MessageId;
logger.LogInformation($"=========CLINET START {requestId}===========");
if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream)) if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream))
{ {
throw new Exception($"[PROXY]:RequestId不存在 {requestId}"); throw new Exception($"[PROXY]:RequestId不存在 {requestId}");
}; };
//using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.ConnectionClosed);
responseStream.TrySetResult(context.Transport);
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
responseStream.TrySetResult((reverseConnection, cancellationTokenSource));
var closedAwaiter = new TaskCompletionSource<object>(); var closedAwaiter = new TaskCompletionSource<object>();
cancellationTokenSource.Token.Register(() =>
{
closedAwaiter.TrySetCanceled();
});
try try
{ {
closedAwaiter.Task.Wait(context.ConnectionClosed); await closedAwaiter.Task;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -152,10 +177,9 @@ internal class ForwarderMiddleware
} }
finally finally
{ {
logger.LogInformation($"=====================Swap End:{requestId}================== "); logger.LogInformation($"=========CLINET END {requestId}===========");
await context.Transport.Input.CompleteAsync(); await context.Transport.Input.CompleteAsync();
await context.Transport.Output.CompleteAsync(); await context.Transport.Output.CompleteAsync();
context.Abort();
} }
} }
} }

View File

@ -15,7 +15,7 @@ using System.Threading.Tasks;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Refs; using FastTunnel.Core.Refs;
namespace FastTunnel.Core.Forwarder.Stream; namespace FastTunnel.Core.Forwarder.Streams;
internal class DuplexPipeStream : System.IO.Stream internal class DuplexPipeStream : System.IO.Stream
{ {
@ -143,7 +143,9 @@ internal class DuplexPipeStream : System.IO.Stream
// buffer.Count is int // buffer.Count is int
var count = (int)Math.Min(readableBuffer.Length, destination.Length); var count = (int)Math.Min(readableBuffer.Length, destination.Length);
readableBuffer = readableBuffer.Slice(0, count); readableBuffer = readableBuffer.Slice(0, count);
Console.WriteLine($"[{GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}");
//Console.WriteLine($"[{GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}");
readableBuffer.CopyTo(destination.Span); readableBuffer.CopyTo(destination.Span);
return count; return count;
} }

View File

@ -12,7 +12,7 @@ using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace FastTunnel.Core.Forwarder.Stream; namespace FastTunnel.Core.Forwarder.Streams;
internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable
{ {

View File

@ -31,7 +31,7 @@ public class SwapHandler : IClientHandler
var msgs = msg.Split('|'); var msgs = msg.Split('|');
var requestId = msgs[0]; var requestId = msgs[0];
var address = msgs[1]; var address = msgs[1];
_logger.LogDebug($"[HandlerMsgAsync] start {requestId}"); _logger.LogInformation($"========Swap Start:{requestId}==========");
try try
{ {
@ -52,7 +52,7 @@ public class SwapHandler : IClientHandler
} }
finally finally
{ {
_logger.LogDebug($"=====================Swap End:{requestId}================== "); _logger.LogInformation($"========Swap End:{requestId}==========");
} }
} }

View File

@ -13,7 +13,7 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Exceptions; using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.Stream; using FastTunnel.Core.Forwarder.Streams;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
@ -44,12 +44,13 @@ public class ForwardDispatcher
{ {
var msgId = Guid.NewGuid().ToString().Replace("-", ""); var msgId = Guid.NewGuid().ToString().Replace("-", "");
(Stream Stream, CancellationTokenSource TokenSource) res = default;
try try
{ {
await Task.Yield();
logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}"); logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
var tcs = new TaskCompletionSource<IDuplexPipe>(); var tcs = new TaskCompletionSource<(Stream Stream, CancellationTokenSource TokenSource)>();
tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); }); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); });
_server.ResponseTasks.TryAdd(msgId, tcs); _server.ResponseTasks.TryAdd(msgId, tcs);
@ -75,10 +76,11 @@ public class ForwardDispatcher
return; return;
} }
var stream1 = await tcs.Task; res = await tcs.Task;
await using var stream2 = new SocketDuplexPipe(_socket);
await Task.WhenAny(stream1.Input.CopyToAsync(stream2.Output), stream2.Input.CopyToAsync(stream1.Output)); //await using var stream2 = new SocketDuplexPipe(_socket);
using var stream2 = new NetworkStream(_socket);
await Task.WhenAny(res.Stream.CopyToAsync(stream2), stream2.CopyToAsync(res.Stream));
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -86,6 +88,7 @@ public class ForwardDispatcher
} }
finally finally
{ {
res.TokenSource?.Cancel();
logger.LogDebug($"[Forward]Swap OK {msgId}"); logger.LogDebug($"[Forward]Swap OK {msgId}");
_server.ResponseTasks.TryRemove(msgId, out _); _server.ResponseTasks.TryRemove(msgId, out _);
} }

View File

@ -24,7 +24,7 @@ public class FastTunnelServer
public readonly IOptionsMonitor<DefaultServerConfig> ServerOption; public readonly IOptionsMonitor<DefaultServerConfig> ServerOption;
private readonly ILogger<FastTunnelServer> logger; private readonly ILogger<FastTunnelServer> logger;
public ConcurrentDictionary<string, TaskCompletionSource<IDuplexPipe>> ResponseTasks { get; } = new(); public ConcurrentDictionary<string, TaskCompletionSource<(Stream Stream, CancellationTokenSource Token)>> ResponseTasks { get; } = new();
public ConcurrentDictionary<string, WebInfo> WebList { get; private set; } = new(); public ConcurrentDictionary<string, WebInfo> WebList { get; private set; } = new();

View File

@ -49,7 +49,6 @@ public class Program
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.UseWindowsService() .UseWindowsService()
.UseSerilog((context, services, configuration) => configuration .UseSerilog((context, services, configuration) => configuration
.MinimumLevel.Debug()
.WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day)
.WriteTo.Console()) .WriteTo.Console())
.ConfigureWebHost(webHostBuilder => .ConfigureWebHost(webHostBuilder =>