😶‍🌫️

This commit is contained in:
Gui.H 2022-07-03 13:07:02 +08:00
parent f6d4d767ce
commit a2f0ba1e12
6 changed files with 60 additions and 50 deletions

View File

@ -122,8 +122,6 @@ public class FastTunnelClient : IFastTunnelClient
private async void HandleServerRequestAsync(byte cmd, string ctx, CancellationToken cancellationToken)
{
await Task.Yield();
try
{
IClientHandler handler;

View File

@ -37,27 +37,36 @@ public class FastTunelProtocol
IDuplexPipe Transport => context.Transport;
const char CharR = '\r';
private const byte ByteCR = (byte)'\r';
private const byte ByteLF = (byte)'\n';
private const byte ByteColon = (byte)':';
private const byte ByteSpace = (byte)' ';
internal async Task TryAnalysisPipeAsync()
{
var _input = Transport.Input;
ReadResult result;
ReadOnlySequence<byte> readableBuffer;
while (true)
{
result = await _input.ReadAsync(context.ConnectionClosed);
var tempBuffer = readableBuffer = result.Buffer;
readableBuffer = result.Buffer;
SequencePosition start = readableBuffer.Start;
SequencePosition? position = null;
do
{
position = tempBuffer.PositionOf((byte)'\n');
position = readableBuffer.PositionOf(ByteLF);
if (position != null)
{
var readedPosition = readableBuffer.GetPosition(1, position.Value);
if (ProcessLine(tempBuffer.Slice(0, position.Value), out var line))
if (ProcessHeaderLine(readableBuffer.Slice(0, position.Value), out var _))
{
if (Method == ProtocolConst.HTTP_METHOD_SWAP)
{
@ -65,7 +74,7 @@ public class FastTunelProtocol
}
else
{
_input.AdvanceTo(readableBuffer.Start, readableBuffer.Start);
_input.AdvanceTo(start, start);
}
if (IsFastTunnel)
@ -73,7 +82,6 @@ public class FastTunelProtocol
context.Features.Set<IFastTunnelFeature>(new FastTunnelFeature()
{
MatchWeb = MatchWeb,
HasReadLInes = HasReadLInes,
Method = Method,
Host = Host,
MessageId = MessageId,
@ -82,7 +90,7 @@ public class FastTunelProtocol
return;
}
tempBuffer = tempBuffer.Slice(readedPosition);
readableBuffer = readableBuffer.Slice(readedPosition);
}
}
while (position != null && !readableBuffer.IsEmpty);
@ -100,7 +108,6 @@ public class FastTunelProtocol
public string MessageId;
private bool isFirstLine = true;
public IList<string> HasReadLInes { get; private set; } = new List<string>();
public FastTunnelServer fastTunnelServer { get; }
/// <summary>
@ -115,23 +122,22 @@ public class FastTunelProtocol
/// Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
///
/// </summary>
/// <param name="readOnlySequence"></param>
/// <param name="headerLine"></param>
/// <returns>Header读取完毕</returns>
private bool ProcessLine(ReadOnlySequence<byte> readOnlySequence, out string lineStr)
private bool ProcessHeaderLine(ReadOnlySequence<byte> headerLine, out string headerLineStr)
{
lineStr = Encoding.UTF8.GetString(readOnlySequence);
HasReadLInes.Add(lineStr);
headerLineStr = Encoding.UTF8.GetString(headerLine);
if (isFirstLine)
{
Method = lineStr.Substring(0, lineStr.IndexOf(" ")).ToUpper();
Method = headerLineStr.Substring(0, headerLineStr.IndexOf(" ")).ToUpper();
switch (Method)
{
case ProtocolConst.HTTP_METHOD_SWAP:
// 客户端发起消息互转
var endIndex = lineStr.IndexOf(" ", 7);
MessageId = lineStr.Substring(7, endIndex - 7);
var endIndex = headerLineStr.IndexOf(" ", 7);
MessageId = headerLineStr.Substring(7, endIndex - 7);
break;
default:
// 常规Http请求需要检查Host决定是否进行代理
@ -142,21 +148,9 @@ public class FastTunelProtocol
}
else
{
if (lineStr.Equals("\r"))
// TrailerHeader
if (headerLineStr.Equals("\r"))
{
if (Method == ProtocolConst.HTTP_METHOD_SWAP)
{
// do nothing
}
else
{
// 匹配Host
if (fastTunnelServer.TryGetWebProxyByHost(Host, out var web))
{
MatchWeb = web;
}
}
return true;
}
@ -168,15 +162,22 @@ public class FastTunelProtocol
default:
// 检查Host决定是否进行代理
// Host: test.test.cc:1270
var lower = lineStr.Trim('\r').ToLower();
if (lower.StartsWith("host:"))
var lower = headerLineStr.Trim(CharR).ToLower();
if (lower.StartsWith("host:", StringComparison.OrdinalIgnoreCase))
{
Host = lower.Split(" ")[1];
if (Host.Contains(":"))
{
Host = Host.Split(":")[0];
}
// 匹配Host
if (fastTunnelServer.TryGetWebProxyByHost(Host, out var web))
{
MatchWeb = web;
}
}
break;
}
}

View File

@ -13,7 +13,7 @@ using FastTunnel.Core.Models;
namespace FastTunnel.Core.Forwarder.Kestrel.Features;
public class FastTunnelFeature : IFastTunnelFeature
public struct FastTunnelFeature : IFastTunnelFeature
{
public WebInfo MatchWeb { get; set; }

View File

@ -69,6 +69,9 @@ internal class ForwarderMiddleware
}
}
public int UserCount = 0;
public int ClientCount = 0;
/// <summary>
/// 用户向服务端发起的请求
/// </summary>
@ -79,6 +82,8 @@ internal class ForwarderMiddleware
var feat = context.Features.Get<IFastTunnelFeature>();
var requestId = Guid.NewGuid().ToString().Replace("-", "");
Interlocked.Increment(ref UserCount);
logger.LogDebug($"=========USER START {requestId}===========");
var web = feat.MatchWeb;
@ -90,12 +95,12 @@ internal class ForwarderMiddleware
return;
}
await Task.Yield();
(Stream Stream, CancellationTokenSource TokenSource) res = (null, null);
try
{
var ss = context.LocalEndPoint;
try
{
// 发送指令给客户端,等待建立隧道
@ -112,7 +117,6 @@ internal class ForwarderMiddleware
res = await tcs.Task;
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(res.TokenSource.Token, context.ConnectionClosed);
using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
//var t1 = res.Input.CopyToAsync(context.Transport.Output, context.ConnectionClosed);
@ -127,6 +131,7 @@ internal class ForwarderMiddleware
}
finally
{
Interlocked.Decrement(ref UserCount);
logger.LogDebug($"=========USER END {requestId}===========");
fastTunnelServer.ResponseTasks.TryRemove(requestId, out _);
@ -145,6 +150,7 @@ internal class ForwarderMiddleware
/// <exception cref="Exception"></exception>
private async Task doSwap(ConnectionContext context)
{
Interlocked.Increment(ref ClientCount);
var feat = context.Features.Get<IFastTunnelFeature>();
var requestId = feat.MessageId;
@ -176,6 +182,7 @@ internal class ForwarderMiddleware
}
finally
{
Interlocked.Decrement(ref ClientCount);
logger.LogDebug($"=========CLINET END {requestId}===========");
await context.Transport.Input.CompleteAsync();
await context.Transport.Output.CompleteAsync();

View File

@ -26,25 +26,28 @@ public class SwapHandler : IClientHandler
_logger = logger;
}
public int SwapCount = 0;
public async Task HandlerMsgAsync(FastTunnelClient cleint, string msg, CancellationToken cancellationToken)
{
var msgs = msg.Split('|');
var requestId = msgs[0];
var address = msgs[1];
_logger.LogDebug($"========Swap Start:{requestId}==========");
string requestId = null;
try
{
using (var serverStream = await createRemote(requestId, cleint, cancellationToken))
using (var localStream = await createLocal(requestId, address, cancellationToken))
{
var taskX = serverStream.CopyToAsync(localStream, cancellationToken);
var taskY = localStream.CopyToAsync(serverStream, cancellationToken);
Interlocked.Increment(ref SwapCount);
var msgs = msg.Split('|');
requestId = msgs[0];
var address = msgs[1];
_logger.LogDebug($"========Swap Start:{requestId}==========");
await Task.WhenAny(taskX, taskY);
using var serverStream = await createRemote(requestId, cleint, cancellationToken);
using var localStream = await createLocal(requestId, address, cancellationToken);
_logger.LogDebug($"[HandlerMsgAsync] success {requestId}");
}
var taskX = serverStream.CopyToAsync(localStream, cancellationToken);
var taskY = localStream.CopyToAsync(serverStream, cancellationToken);
await Task.WhenAny(taskX, taskY).WaitAsync(cancellationToken);
_logger.LogDebug($"[HandlerMsgAsync] success {requestId}");
}
catch (Exception ex)
{
@ -52,6 +55,7 @@ public class SwapHandler : IClientHandler
}
finally
{
Interlocked.Decrement(ref SwapCount);
_logger.LogDebug($"========Swap End:{requestId}==========");
}
}

View File

@ -81,7 +81,7 @@ public class PortProxyListener
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// 将此客户端交由Dispatcher进行管理