This commit is contained in:
Gui.H 2022-05-11 23:30:22 +08:00
parent 76cc6933a9
commit 8fbf17d775
13 changed files with 141 additions and 51 deletions

View File

@ -5,7 +5,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="7.0.0-preview.3.22175.4" /> <PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="7.0.0-preview.3.22175.4" />
<PackageReference Include="Microsoft.Extensions.Logging.Log4Net.AspNetCore" Version="6.1.0" />
<PackageReference Include="Serilog.AspNetCore" Version="5.0.0" /> <PackageReference Include="Serilog.AspNetCore" Version="5.0.0" />
</ItemGroup> </ItemGroup>

View File

@ -49,6 +49,7 @@ class Program
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.UseWindowsService() .UseWindowsService()
.UseSerilog((context, services, configuration) => configuration .UseSerilog((context, services, configuration) => configuration
.MinimumLevel.Debug()
.WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day)
.WriteTo.Console()) .WriteTo.Console())
.ConfigureServices((hostContext, services) => .ConfigureServices((hostContext, services) =>

View File

@ -6,7 +6,6 @@
using FastTunnel.Core.Client; using FastTunnel.Core.Client;
using FastTunnel.Core.Config; using FastTunnel.Core.Config;
using FastTunnel.Core.Forwarder.MiddleWare;
using FastTunnel.Core.Handlers.Client; using FastTunnel.Core.Handlers.Client;
using FastTunnel.Core.Handlers.Server; using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;

View File

@ -35,6 +35,13 @@ internal class FastTunnelConnectionMiddleware
await ftContext.TryAnalysisPipeAsync(); await ftContext.TryAnalysisPipeAsync();
logger.LogInformation("=========TryAnalysisPipeAsync END==========="); logger.LogInformation("=========TryAnalysisPipeAsync END===========");
await next(ftContext.IsFastTunnel ? ftContext : context); if (ftContext.IsFastTunnel)
{
await next(ftContext.IsFastTunnel ? ftContext : context);
}
else
{
await next(context);
}
} }
} }

View File

@ -7,6 +7,7 @@
using System; using System;
using System.Buffers; using System.Buffers;
using System.IO; using System.IO;
using System.IO.Pipelines;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -17,6 +18,7 @@ using FastTunnel.Core.Forwarder.MiddleWare;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
using FastTunnel.Core.Utilitys;
using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Connections.Features;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -46,11 +48,11 @@ internal class SwapConnectionMiddleware
{ {
if (ctx.Method == ProtocolConst.HTTP_METHOD_SWAP) if (ctx.Method == ProtocolConst.HTTP_METHOD_SWAP)
{ {
await doSwap(ctx); await setResponse(ctx);
} }
else if (ctx.MatchWeb != null) else if (ctx.MatchWeb != null)
{ {
await waitSwap(ctx); await waitResponse(ctx);
} }
else else
{ {
@ -63,17 +65,19 @@ internal class SwapConnectionMiddleware
} }
} }
private async Task waitSwap(FastTunnelConnectionContext context) private async Task waitResponse(FastTunnelConnectionContext context)
{ {
var requestId = Guid.NewGuid().ToString().Replace("-", ""); var requestId = Guid.NewGuid().ToString().Replace("-", "");
var web = context.MatchWeb; var web = context.MatchWeb;
TaskCompletionSource<Stream> tcs = new(); TaskCompletionSource<IDuplexPipe> tcs = new();
logger.LogDebug($"[Http]Swap开始 {requestId}|{context.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}"); logger.LogDebug($"[Http]Swap开始 {requestId}|{context.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); }); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); });
fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs); fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs);
IDuplexPipe res = null;
try try
{ {
try try
@ -89,29 +93,34 @@ internal class SwapConnectionMiddleware
throw new ClienOffLineException("客户端离线"); throw new ClienOffLineException("客户端离线");
} }
using var res = await tcs.Task; var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
var t1 = res.CopyToAsync(reverseConnection); res = await tcs.Task;
var t2 = reverseConnection.CopyToAsync(res);
await Task.WhenAll(t1, t2); // using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
logger.LogDebug("[Http]Swap结束"); var t1 = res.Input.CopyToAsync(context.Transport.Output, lifetime.ConnectionClosed);
var t2 = context.Transport.Input.CopyToAsync(res.Output, lifetime.ConnectionClosed);
await Task.WhenAny(t1, t2);
} }
catch (Exception) catch (Exception ex)
{ {
throw; logger.LogError(ex, "[waitResponse]");
} }
finally finally
{ {
logger.LogInformation("[Http] waitSwap结束");
fastTunnelServer.ResponseTasks.TryRemove(requestId, out _); fastTunnelServer.ResponseTasks.TryRemove(requestId, out _);
context.Transport.Input.Complete();
context.Transport.Output.Complete(); await context.Transport.Input.CompleteAsync();
await context.Transport.Output.CompleteAsync();
await res.Input.CompleteAsync();
await res.Output.CompleteAsync();
} }
} }
private async Task doSwap(FastTunnelConnectionContext context) private async Task setResponse(FastTunnelConnectionContext context)
{ {
var requestId = context.MessageId; var requestId = context.MessageId;
if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream)) if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream))
@ -119,31 +128,26 @@ internal class SwapConnectionMiddleware
throw new Exception($"[PROXY]:RequestId不存在 {requestId}"); throw new Exception($"[PROXY]:RequestId不存在 {requestId}");
}; };
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); //using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
responseStream.TrySetResult(reverseConnection); responseStream.TrySetResult(context.Transport);
var lifetime = context.Features.Get<IConnectionLifetimeFeature>(); var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
var closedAwaiter = new TaskCompletionSource<object>(); var closedAwaiter = new TaskCompletionSource<object>();
lifetime.ConnectionClosed.Register((task) =>
{
(task as TaskCompletionSource<object>).SetResult(null);
}, closedAwaiter);
try try
{ {
await closedAwaiter.Task; closedAwaiter.Task.Wait(lifetime.ConnectionClosed);
} }
catch (Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, ""); logger.LogError(ex, "[setResponse]");
} }
finally finally
{ {
context.Transport.Input.Complete();
context.Transport.Output.Complete();
logger.LogInformation($"=====================Swap End:{requestId}================== "); logger.LogInformation($"=====================Swap End:{requestId}================== ");
await context.Transport.Input.CompleteAsync();
await context.Transport.Output.CompleteAsync();
context.Abort();
} }
} }
} }

View File

@ -0,0 +1,37 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Forwarder.MiddleWare;
internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable
{
NetworkStream stream;
public SocketDuplexPipe(Socket socket)
{
stream = new NetworkStream(socket, true);
Input = PipeReader.Create(stream);
Output = PipeWriter.Create(stream);
}
public PipeReader Input { get; private set; }
public PipeWriter Output { get; private set; }
public async ValueTask DisposeAsync()
{
await stream.DisposeAsync();
}
}

View File

@ -31,6 +31,7 @@ public class SwapHandler : IClientHandler
var msgs = msg.Split('|'); var msgs = msg.Split('|');
var requestId = msgs[0]; var requestId = msgs[0];
var address = msgs[1]; var address = msgs[1];
_logger.LogDebug($"[HandlerMsgAsync] start {requestId}");
try try
{ {
@ -40,24 +41,19 @@ public class SwapHandler : IClientHandler
var taskX = serverStream.CopyToAsync(localStream, cancellationToken); var taskX = serverStream.CopyToAsync(localStream, cancellationToken);
var taskY = localStream.CopyToAsync(serverStream, cancellationToken); var taskY = localStream.CopyToAsync(serverStream, cancellationToken);
try await Task.WhenAny(taskX, taskY);
{
await Task.WhenAll(taskX, taskY);
}
catch (Exception)
{
} _logger.LogDebug($"[HandlerMsgAsync] success {requestId}");
finally
{
_logger.LogError($"=====================Swap End:{requestId}================== ");
}
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, $"Swap error {requestId}"); _logger.LogError(ex, $"Swap error {requestId}");
} }
finally
{
_logger.LogDebug($"=====================Swap End:{requestId}================== ");
}
} }
private async Task<Stream> createLocal(string requestId, string localhost, CancellationToken cancellationToken) private async Task<Stream> createLocal(string requestId, string localhost, CancellationToken cancellationToken)

View File

@ -9,7 +9,6 @@ using System.Net.WebSockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
@ -17,7 +16,7 @@ using FastTunnel.Core.Server;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Forwarder.MiddleWare; namespace FastTunnel.Core.Handlers.Server;
public class FastTunnelClientHandler public class FastTunnelClientHandler
{ {

View File

@ -6,12 +6,14 @@
using System; using System;
using System.IO; using System.IO;
using System.IO.Pipelines;
using System.Net.Sockets; using System.Net.Sockets;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Exceptions; using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.MiddleWare;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
@ -47,7 +49,7 @@ public class ForwardDispatcher
await Task.Yield(); await Task.Yield();
logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}"); logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
var tcs = new TaskCompletionSource<Stream>(); var tcs = new TaskCompletionSource<IDuplexPipe>();
tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); }); tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Dispatch TimeOut]:{msgId}"); });
_server.ResponseTasks.TryAdd(msgId, tcs); _server.ResponseTasks.TryAdd(msgId, tcs);
@ -73,13 +75,10 @@ public class ForwardDispatcher
return; return;
} }
using (var stream1 = await tcs.Task) var stream1 = await tcs.Task;
using (var stream2 = new NetworkStream(_socket, true) { ReadTimeout = 1000 * 60 * 10 }) await using var stream2 = new SocketDuplexPipe(_socket);
{
await Task.WhenAll(stream1.CopyToAsync(stream2), stream2.CopyToAsync(stream1));
}
logger.LogDebug($"[Forward]Swap OK {msgId}"); await Task.WhenAny(stream1.Input.CopyToAsync(stream2.Output), stream2.Input.CopyToAsync(stream1.Output));
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -87,6 +86,7 @@ public class ForwardDispatcher
} }
finally finally
{ {
logger.LogDebug($"[Forward]Swap OK {msgId}");
_server.ResponseTasks.TryRemove(msgId, out _); _server.ResponseTasks.TryRemove(msgId, out _);
} }
} }

View File

@ -8,6 +8,7 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.IO.Pipelines;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Config; using FastTunnel.Core.Config;
@ -23,7 +24,7 @@ public class FastTunnelServer
public readonly IOptionsMonitor<DefaultServerConfig> ServerOption; public readonly IOptionsMonitor<DefaultServerConfig> ServerOption;
private readonly ILogger<FastTunnelServer> logger; private readonly ILogger<FastTunnelServer> logger;
public ConcurrentDictionary<string, TaskCompletionSource<Stream>> ResponseTasks { get; } = new(); public ConcurrentDictionary<string, TaskCompletionSource<IDuplexPipe>> ResponseTasks { get; } = new();
public ConcurrentDictionary<string, WebInfo> WebList { get; private set; } = new(); public ConcurrentDictionary<string, WebInfo> WebList { get; private set; } = new();

View File

@ -0,0 +1,46 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Utilitys;
internal class SwapUtility
{
IDuplexPipe pipe1;
IDuplexPipe pipe2;
public SwapUtility(IDuplexPipe pipe1, IDuplexPipe pipe2)
{
this.pipe1 = pipe1;
this.pipe2 = pipe2;
}
internal async Task SwapAsync(CancellationToken cancellationToken = default)
{
}
private async Task T1(IDuplexPipe pipe, IDuplexPipe pipe1, CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result;
ReadOnlySequence<byte> readableBuffer;
result = await pipe.Input.ReadAsync(cancellationToken);
readableBuffer = result.Buffer;
}
}
}

View File

@ -49,6 +49,7 @@ public class Program
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.UseWindowsService() .UseWindowsService()
.UseSerilog((context, services, configuration) => configuration .UseSerilog((context, services, configuration) => configuration
.MinimumLevel.Debug()
.WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day) .WriteTo.File("logs/log-.txt", rollingInterval: RollingInterval.Day)
.WriteTo.Console()) .WriteTo.Console())
.ConfigureWebHost(webHostBuilder => .ConfigureWebHost(webHostBuilder =>