This commit is contained in:
Gui.H 2022-05-05 22:23:08 +08:00
parent 401276cb6a
commit 35a61f389f
8 changed files with 280 additions and 24 deletions

View File

@ -24,12 +24,14 @@ public static class ListenOptionsSwapExtensions
var loggerFactory = listenOptions.KestrelServerOptions.ApplicationServices.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<SwapConnectionMiddleware>();
var loggerClient = loggerFactory.CreateLogger<ClientConnectionMiddleware>();
var loggerHttp = loggerFactory.CreateLogger<HandleHttpConnectionMiddleware>();
var fastTunnelServer = listenOptions.KestrelServerOptions.ApplicationServices.GetRequiredService<FastTunnelServer>();
listenOptions.Use(next => new HandleHttpConnectionMiddleware(next, loggerHttp, fastTunnelServer).OnConnectionAsync);
listenOptions.Use(next => new SwapConnectionMiddleware(next, logger, fastTunnelServer).OnConnectionAsync);
// 登录频率低,放在后面
//listenOptions.Use(next => new ClientConnectionMiddleware(next, loggerClient, fastTunnelServer).OnConnectionAsync);
// listenOptions.Use(next => new ClientConnectionMiddleware(next, loggerClient, fastTunnelServer).OnConnectionAsync);
return listenOptions;
}
}

View File

@ -32,23 +32,17 @@ internal class ClientConnectionMiddleware
internal async Task OnConnectionAsync(ConnectionContext context)
{
var oldTransport = context.Transport;
try
if (!await ReadPipeAsync(context))
{
if (!await ReadPipeAsync(context))
{
await next(context);
}
await next(context);
}
finally
{
context.Transport = oldTransport;
}
}
/// <summary>
///
/// </summary>
/// <param name="context"></param>
/// <returns>is for FastTunnel</returns>
async Task<bool> ReadPipeAsync(ConnectionContext context)
{
var reader = context.Transport.Input;

View File

@ -0,0 +1,170 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Forwarder.Kestrel;
internal class FastTunnelConnectionContext : ConnectionContext
{
private ConnectionContext _inner;
ILogger _logger;
public FastTunnelConnectionContext(ConnectionContext context, ILogger logger)
{
this._inner = context;
this._logger = logger;
}
public override IDuplexPipe Transport { get => _inner.Transport; set => _inner.Transport = value; }
public override string ConnectionId { get => _inner.ConnectionId; set => _inner.ConnectionId = value; }
public override IFeatureCollection Features => _inner.Features;
public override IDictionary<object, object> Items { get => _inner.Items; set => _inner.Items = value; }
public override ValueTask DisposeAsync()
{
return _inner.DisposeAsync();
}
ReadOnlySequence<byte> readableBuffer;
/// <summary>
/// 解析FastTunnel协议
/// </summary>
/// <returns>is for FastTunnel</returns>
internal async Task<bool> TryAnalysisPipeAsync()
{
var reader = Transport.Input;
while (true)
{
var result = await reader.ReadAsync();
readableBuffer = result.Buffer;
SequencePosition? position = null;
var start = readableBuffer.Start;
do
{
position = readableBuffer.PositionOf((byte)'\n');
if (position != null)
{
ProcessLine(readableBuffer.Slice(0, position.Value));
if (HeaderEnd)
{
if (Method == "SWAP")
{
reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
else
{
reader.AdvanceTo(start, start);
}
return false;
}
// 剔除已处理的行 +\n
readableBuffer = readableBuffer.Slice(readableBuffer.GetPosition(1, position.Value));
}
}
while (position != null);
reader.AdvanceTo(start, start);
if (result.IsCompleted)
{
break;
}
}
return false;
}
public string Method;
public string Host = null;
public string MessageId;
bool HeaderEnd = false;
bool isFirstLine = true;
/// <summary>
///
/// GET / HTTP/1.1
/// Host: test.test.cc:1270
/// Connection: keep-alive
/// Upgrade-Insecure-Requests: 1
/// User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36
/// Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
/// Accept-Encoding: gzip, deflate
/// Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
///
/// </summary>
/// <param name="readOnlySequence"></param>
private void ProcessLine(ReadOnlySequence<byte> readOnlySequence)
{
var lineStr = Encoding.UTF8.GetString(readOnlySequence);
Console.WriteLine($"[Handle] {lineStr}");
if (isFirstLine)
{
Method = lineStr.Substring(0, lineStr.IndexOf(" ")).ToUpper();
switch (Method)
{
case "SWAP":
// 客户端发起消息互转
var endIndex = lineStr.IndexOf(" ", 6);
MessageId = lineStr.Substring(6, endIndex - 6);
break;
default:
// 常规Http请求需要检查Host决定是否进行代理
break;
}
isFirstLine = false;
}
else
{
if (lineStr.Equals("\r"))
{
HeaderEnd = true;
return;
}
switch (Method)
{
case "SWAP":
// 找msgid
break;
default:
// 检查Host决定是否进行代理
// Host: test.test.cc:1270
var lower = lineStr.Trim('\r').ToLower();
if (lower.StartsWith("host:"))
{
Host = lower.Split(" ")[1];
if (Host.Contains(":"))
{
Host = Host.Split(":")[0];
}
}
break;
}
}
}
}

View File

@ -0,0 +1,39 @@
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://github.com/FastTunnel/FastTunnel/edit/v2/LICENSE
// Copyright (c) 2019 Gui.H
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using FastTunnel.Core.Client;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Forwarder.Kestrel;
internal class HandleHttpConnectionMiddleware
{
readonly ConnectionDelegate next;
readonly ILogger<HandleHttpConnectionMiddleware> logger;
FastTunnelServer fastTunnelServer;
public HandleHttpConnectionMiddleware(ConnectionDelegate next, ILogger<HandleHttpConnectionMiddleware> logger, FastTunnelServer fastTunnelServer)
{
this.next = next;
this.logger = logger;
this.fastTunnelServer = fastTunnelServer;
}
internal async Task OnConnectionAsync(ConnectionContext context)
{
var ftContext = new FastTunnelConnectionContext(context, logger);
var fasttunnelHandle = await ftContext.TryAnalysisPipeAsync();
await next(ftContext);
}
}

View File

@ -34,14 +34,66 @@ internal class SwapConnectionMiddleware
internal async Task OnConnectionAsync(ConnectionContext context)
{
var oldTransport = context.Transport;
if (!await ReadPipeAsync(context))
var ctx = context as FastTunnelConnectionContext;
if (ctx != null)
{
if (ctx.Method == "SWAP")
{
await doSwap(ctx);
}
else
{
await next(context);
}
}
else
{
await next(context);
}
}
private async Task doSwap(FastTunnelConnectionContext context)
{
var requestId = context.MessageId;
if (!fastTunnelServer.ResponseTasks.TryRemove(requestId, out var responseForYarp))
{
logger.LogError($"[PROXY]:RequestId不存在 {requestId}");
return;
};
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
responseForYarp.TrySetResult(reverseConnection);
var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
var closedAwaiter = new TaskCompletionSource<object>();
lifetime.ConnectionClosed.Register((task) =>
{
(task as TaskCompletionSource<object>).SetResult(null);
}, closedAwaiter);
try
{
await closedAwaiter.Task;
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
finally
{
context.Transport.Input.Complete();
context.Transport.Output.Complete();
logger.LogInformation($"=====================Swap End:{requestId}================== ");
}
}
/// <summary>
///
/// </summary>
/// <param name="context"></param>
/// <returns>is for FastTunnel</returns>
async Task<bool> ReadPipeAsync(ConnectionContext context)
{
var reader = context.Transport.Input;
@ -88,9 +140,9 @@ internal class SwapConnectionMiddleware
var firstLineBuffer = buffer.Slice(0, position);
var firstLine = Encoding.UTF8.GetString(firstLineBuffer);
// PROXY /c74eb488a0f54d888e63d85c67428b52 HTTP/1.1
var endIndex = firstLine.IndexOf(" ", 7);
var requestId = firstLine.Substring(7, endIndex - 7);
// SWAP /c74eb488a0f54d888e63d85c67428b52 HTTP/1.1
var endIndex = firstLine.IndexOf(" ", 6);
var requestId = firstLine.Substring(6, endIndex - 6);
Console.WriteLine($"[开始进行Swap操作] {requestId}");
context.Transport.Input.AdvanceTo(buffer.GetPosition(1, position), buffer.GetPosition(1, position));
@ -136,7 +188,6 @@ internal class SwapConnectionMiddleware
private bool ProcessProxyLine(ReadOnlySequence<byte> readOnlySequence)
{
var str = Encoding.UTF8.GetString(readOnlySequence);
return str.StartsWith("PROXY");
return str.StartsWith("SWAP");
}
}

View File

@ -76,7 +76,7 @@ namespace FastTunnel.Core.Handlers.Client
serverStream = sslStream;
}
var reverse = $"PROXY /{requestId} HTTP/1.1\r\n";
var reverse = $"SWAP /{requestId} HTTP/1.1\r\n";
//var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n";
var requestMsg = Encoding.UTF8.GetBytes(reverse);

View File

@ -57,7 +57,7 @@ namespace FastTunnel.Server
do
{
// 在缓冲数据中查找找一个行末尾
position = buffer.PositionOf((byte)'\r\n');
position = buffer.PositionOf((byte)'\n');
if (position != null)
{

View File

@ -114,7 +114,7 @@ public class Startup
{
endpoints.MapControllers();
// -------------------FastTunnel STEP3 OF 3------------------
endpoints.MapFastTunnelServer();
//endpoints.MapFastTunnelServer();
// -------------------FastTunnel STEP3 END-------------------
});
}