This commit is contained in:
Gui.H 2022-05-08 21:09:08 +08:00
parent 35a61f389f
commit ea8072ac6e
8 changed files with 145 additions and 113 deletions

View File

@ -1,4 +1,4 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
@ -67,5 +67,10 @@ namespace FastTunnel.Core.Client
Clients.Remove(client);
client.Logout();
}
internal bool TryGetWebProxyByHost(string host, out WebInfo web)
{
return WebList.TryGetValue(host, out web);
}
}
}

View File

@ -31,7 +31,6 @@ public static class ListenOptionsSwapExtensions
listenOptions.Use(next => new SwapConnectionMiddleware(next, logger, fastTunnelServer).OnConnectionAsync);
// 登录频率低,放在后面
// listenOptions.Use(next => new ClientConnectionMiddleware(next, loggerClient, fastTunnelServer).OnConnectionAsync);
return listenOptions;
}
}

View File

@ -73,28 +73,28 @@ namespace FastTunnel.Core.Extensions
{
app.UseWebSockets();
var swapHandler = app.ApplicationServices.GetRequiredService<FastTunnelSwapHandler>();
// var swapHandler = app.ApplicationServices.GetRequiredService<FastTunnelSwapHandler>();
var clientHandler = app.ApplicationServices.GetRequiredService<FastTunnelClientHandler>();
app.Use(clientHandler.Handle);
}
public static void MapFastTunnelServer(this IEndpointRouteBuilder endpoints)
{
endpoints.MapReverseProxy();
endpoints.MapFallback(context =>
{
var options = context.RequestServices.GetRequiredService<IOptionsMonitor<DefaultServerConfig>>();
var host = context.Request.Host.Host;
if (!host.EndsWith(options.CurrentValue.WebDomain) || host.Equals(options.CurrentValue.WebDomain))
{
context.Response.StatusCode = 404;
return Task.CompletedTask;
}
//public static void MapFastTunnelServer(this IEndpointRouteBuilder endpoints)
//{
// endpoints.MapReverseProxy();
// endpoints.MapFallback(context =>
// {
// var options = context.RequestServices.GetRequiredService<IOptionsMonitor<DefaultServerConfig>>();
// var host = context.Request.Host.Host;
// if (!host.EndsWith(options.CurrentValue.WebDomain) || host.Equals(options.CurrentValue.WebDomain))
// {
// context.Response.StatusCode = 404;
// return Task.CompletedTask;
// }
context.Response.StatusCode = 200;
context.Response.WriteAsync(TunnelResource.Page_NotFound, CancellationToken.None);
return Task.CompletedTask;
});
}
// context.Response.StatusCode = 200;
// context.Response.WriteAsync(TunnelResource.Page_NotFound, CancellationToken.None);
// return Task.CompletedTask;
// });
//}
}
}

View File

@ -11,6 +11,8 @@ using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using FastTunnel.Core.Client;
using FastTunnel.Core.Models;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
@ -19,10 +21,12 @@ namespace FastTunnel.Core.Forwarder.Kestrel;
internal class FastTunnelConnectionContext : ConnectionContext
{
private ConnectionContext _inner;
FastTunnelServer fastTunnelServer;
ILogger _logger;
public FastTunnelConnectionContext(ConnectionContext context, ILogger logger)
public FastTunnelConnectionContext(ConnectionContext context, FastTunnelServer fastTunnelServer, ILogger logger)
{
this.fastTunnelServer = fastTunnelServer;
this._inner = context;
this._logger = logger;
}
@ -35,6 +39,10 @@ internal class FastTunnelConnectionContext : ConnectionContext
public override IDictionary<object, object> Items { get => _inner.Items; set => _inner.Items = value; }
public bool IsFastTunnel => Method == "PROXY" || MatchWeb != null;
public WebInfo MatchWeb { get; private set; }
public override ValueTask DisposeAsync()
{
return _inner.DisposeAsync();
@ -49,45 +57,45 @@ internal class FastTunnelConnectionContext : ConnectionContext
internal async Task<bool> TryAnalysisPipeAsync()
{
var reader = Transport.Input;
ReadResult result;
ReadOnlySequence<byte> readableBuffer;
while (true)
{
var result = await reader.ReadAsync();
readableBuffer = result.Buffer;
SequencePosition? position = null;
result = await reader.ReadAsync();
var tempBuffer = readableBuffer = result.Buffer;
var start = readableBuffer.Start;
SequencePosition? position = null;
do
{
position = readableBuffer.PositionOf((byte)'\n');
position = tempBuffer.PositionOf((byte)'\n');
if (position != null)
{
ProcessLine(readableBuffer.Slice(0, position.Value));
if (HeaderEnd)
if (ProcessLine(tempBuffer.Slice(0, position.Value)))
{
if (Method == "SWAP")
if (Method == "PROXY")
{
reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
reader.AdvanceTo(position.Value, position.Value);
}
else
{
reader.AdvanceTo(start, start);
reader.AdvanceTo(readableBuffer.Start, readableBuffer.Start);
}
return false;
}
// 剔除已处理的行 +\n
readableBuffer = readableBuffer.Slice(readableBuffer.GetPosition(1, position.Value));
tempBuffer = tempBuffer.Slice(readableBuffer.GetPosition(1, position.Value));
}
}
while (position != null);
reader.AdvanceTo(start, start);
while (position != null && !readableBuffer.IsEmpty);
if (result.IsCompleted)
{
reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
break;
}
}
@ -99,7 +107,7 @@ internal class FastTunnelConnectionContext : ConnectionContext
public string Host = null;
public string MessageId;
bool HeaderEnd = false;
bool complete = false;
bool isFirstLine = true;
/// <summary>
@ -115,20 +123,20 @@ internal class FastTunnelConnectionContext : ConnectionContext
///
/// </summary>
/// <param name="readOnlySequence"></param>
private void ProcessLine(ReadOnlySequence<byte> readOnlySequence)
private bool ProcessLine(ReadOnlySequence<byte> readOnlySequence)
{
var lineStr = Encoding.UTF8.GetString(readOnlySequence);
Console.WriteLine($"[Handle] {lineStr}");
Console.WriteLine($"[HandleLien] {lineStr}");
if (isFirstLine)
{
Method = lineStr.Substring(0, lineStr.IndexOf(" ")).ToUpper();
switch (Method)
{
case "SWAP":
case "PROXY":
// 客户端发起消息互转
var endIndex = lineStr.IndexOf(" ", 6);
MessageId = lineStr.Substring(6, endIndex - 6);
var endIndex = lineStr.IndexOf(" ", 7);
MessageId = lineStr.Substring(7, endIndex - 7);
break;
default:
// 常规Http请求需要检查Host决定是否进行代理
@ -141,13 +149,23 @@ internal class FastTunnelConnectionContext : ConnectionContext
{
if (lineStr.Equals("\r"))
{
HeaderEnd = true;
return;
complete = true;
if (Method != "PROXY")
{
// 匹配Host
if (fastTunnelServer.TryGetWebProxyByHost(Host, out WebInfo web))
{
MatchWeb = web;
}
}
return true;
}
switch (Method)
{
case "SWAP":
case "PROXY":
// 找msgid
break;
@ -166,5 +184,7 @@ internal class FastTunnelConnectionContext : ConnectionContext
break;
}
}
return false;
}
}

View File

@ -31,9 +31,11 @@ internal class HandleHttpConnectionMiddleware
internal async Task OnConnectionAsync(ConnectionContext context)
{
var ftContext = new FastTunnelConnectionContext(context, logger);
var fasttunnelHandle = await ftContext.TryAnalysisPipeAsync();
logger.LogInformation("=========OnConnectionAsync===========");
var ftContext = new FastTunnelConnectionContext(context, fastTunnelServer, logger);
await ftContext.TryAnalysisPipeAsync();
await next(ftContext);
logger.LogInformation("=========TryAnalysisPipeAsync END===========");
await next(ftContext.IsFastTunnel ? ftContext : context);
}
}

View File

@ -7,12 +7,16 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading.Tasks;
using FastTunnel.Core.Client;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.MiddleWare;
using FastTunnel.Core.Models;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.Extensions.Logging;
@ -35,16 +39,22 @@ internal class SwapConnectionMiddleware
internal async Task OnConnectionAsync(ConnectionContext context)
{
var ctx = context as FastTunnelConnectionContext;
if (ctx != null)
if (ctx != null && ctx.IsFastTunnel)
{
if (ctx.Method == "SWAP")
if (ctx.Method == "PROXY")
{
await doSwap(ctx);
}
else if (ctx.MatchWeb != null)
{
await waitSwap(ctx);
}
else
{
await next(context);
throw new NotSupportedException();
}
}
else
{
@ -52,17 +62,64 @@ internal class SwapConnectionMiddleware
}
}
private async Task waitSwap(FastTunnelConnectionContext context)
{
var requestId = Guid.NewGuid().ToString().Replace("-", "");
var web = context.MatchWeb;
TaskCompletionSource<Stream> tcs = new();
logger.LogDebug($"[Http]Swap开始 {requestId}|{context.Host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
tcs.SetTimeOut(10000, () => { logger.LogDebug($"[Proxy TimeOut]:{requestId}"); });
fastTunnelServer.ResponseTasks.TryAdd(requestId, tcs);
try
{
try
{
// 发送指令给客户端,等待建立隧道
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{requestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", default);
}
catch (WebSocketException)
{
web.LogOut();
// 通讯异常,返回客户端离线
throw new Exception("客户端离线");
}
using var res = await tcs.Task;
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
var t1 = res.CopyToAsync(reverseConnection);
var t2 = reverseConnection.CopyToAsync(res);
await Task.WhenAll(t1, t2);
logger.LogDebug("[Http]Swap结束");
}
catch (Exception)
{
throw;
}
finally
{
fastTunnelServer.ResponseTasks.TryRemove(requestId, out _);
context.Transport.Input.Complete();
context.Transport.Output.Complete();
}
}
private async Task doSwap(FastTunnelConnectionContext context)
{
var requestId = context.MessageId;
if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseForYarp))
if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseStream))
{
logger.LogError($"[PROXY]:RequestId不存在 {requestId}");
return;
throw new Exception($"[PROXY]:RequestId不存在 {requestId}");
};
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
responseForYarp.TrySetResult(reverseConnection);
responseStream.TrySetResult(reverseConnection);
var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
@ -89,52 +146,6 @@ internal class SwapConnectionMiddleware
}
}
/// <summary>
///
/// </summary>
/// <param name="context"></param>
/// <returns>is for FastTunnel</returns>
async Task<bool> ReadPipeAsync(ConnectionContext context)
{
var reader = context.Transport.Input;
var isProxy = false;
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
SequencePosition? position = null;
do
{
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
isProxy = ProcessProxyLine(buffer.Slice(0, position.Value));
if (isProxy)
{
await Swap(buffer, position.Value, context);
return true;
}
else
{
context.Transport.Input.AdvanceTo(buffer.Start, buffer.Start);
return false;
}
}
}
while (position != null);
if (result.IsCompleted)
{
break;
}
}
return false;
}
private async Task Swap(ReadOnlySequence<byte> buffer, SequencePosition position, ConnectionContext context)
{
var firstLineBuffer = buffer.Slice(0, position);
@ -180,14 +191,4 @@ internal class SwapConnectionMiddleware
logger.LogInformation($"=====================Swap End:{requestId}================== ");
}
}
/// <summary>
///
/// </summary>
/// <param name="readOnlySequence"></param>
private bool ProcessProxyLine(ReadOnlySequence<byte> readOnlySequence)
{
var str = Encoding.UTF8.GetString(readOnlySequence);
return str.StartsWith("SWAP");
}
}

View File

@ -76,8 +76,7 @@ namespace FastTunnel.Core.Handlers.Client
serverStream = sslStream;
}
var reverse = $"SWAP /{requestId} HTTP/1.1\r\n";
//var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n";
var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n";
var requestMsg = Encoding.UTF8.GetBytes(reverse);
await serverStream.WriteAsync(requestMsg, cancellationToken);

View File

@ -17,5 +17,11 @@ namespace FastTunnel.Core.Models
public WebSocket Socket { get; set; }
public WebConfig WebConfig { get; set; }
internal void LogOut()
{
// TODO:退出登录
throw new NotImplementedException();
}
}
}