This commit is contained in:
Gui.H 2022-07-01 18:12:30 +08:00
parent 224fee149b
commit 6233584868
7 changed files with 17 additions and 19 deletions

View File

@ -12,13 +12,14 @@ using System.Linq;
using System.Reflection.PortableExecutable; using System.Reflection.PortableExecutable;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Forwarder.Kestrel.Features;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.FileSystemGlobbing; using Microsoft.Extensions.FileSystemGlobbing;
namespace FastTunnel.Core.Forwarder.Kestrel.MiddleWare; namespace FastTunnel.Core.Forwarder;
public class FastTunelProtocol public class FastTunelProtocol
{ {
@ -56,7 +57,7 @@ public class FastTunelProtocol
if (position != null) if (position != null)
{ {
var readedPosition = readableBuffer.GetPosition(1, position.Value); var readedPosition = readableBuffer.GetPosition(1, position.Value);
if (ProcessLine(tempBuffer.Slice(0, position.Value), out string line)) if (ProcessLine(tempBuffer.Slice(0, position.Value), out var line))
{ {
if (Method == ProtocolConst.HTTP_METHOD_SWAP) if (Method == ProtocolConst.HTTP_METHOD_SWAP)
{ {

View File

@ -11,7 +11,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
namespace FastTunnel.Core.Forwarder.Kestrel.MiddleWare; namespace FastTunnel.Core.Forwarder.Kestrel.Features;
public class FastTunnelFeature : IFastTunnelFeature public class FastTunnelFeature : IFastTunnelFeature
{ {

View File

@ -12,7 +12,7 @@ using System.Threading.Tasks;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
namespace FastTunnel.Core.Forwarder.Kestrel.MiddleWare; namespace FastTunnel.Core.Forwarder.Kestrel.Features;
internal interface IFastTunnelFeature internal interface IFastTunnelFeature
{ {

View File

@ -14,7 +14,7 @@ using System.Threading.Tasks;
using FastTunnel.Core.Exceptions; using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.Kestrel; using FastTunnel.Core.Forwarder.Kestrel;
using FastTunnel.Core.Forwarder.MiddleWare; using FastTunnel.Core.Forwarder.Kestrel.Features;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Protocol; using FastTunnel.Core.Protocol;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
@ -93,7 +93,7 @@ internal class ForwarderMiddleware
try try
{ {
// 发送指令给客户端,等待建立隧道 // 发送指令给客户端,等待建立隧道
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{requestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", default); await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{requestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", context.ConnectionClosed);
} }
catch (WebSocketException) catch (WebSocketException)
{ {
@ -102,9 +102,7 @@ internal class ForwarderMiddleware
// 通讯异常,返回客户端离线 // 通讯异常,返回客户端离线
throw new ClienOffLineException("客户端离线"); throw new ClienOffLineException("客户端离线");
} }
var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
res = await tcs.Task; res = await tcs.Task;
// using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); // using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
@ -142,12 +140,11 @@ internal class ForwarderMiddleware
//using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true); //using var reverseConnection = new DuplexPipeStream(context.Transport.Input, context.Transport.Output, true);
responseStream.TrySetResult(context.Transport); responseStream.TrySetResult(context.Transport);
var lifetime = context.Features.Get<IConnectionLifetimeFeature>();
var closedAwaiter = new TaskCompletionSource<object>(); var closedAwaiter = new TaskCompletionSource<object>();
try try
{ {
closedAwaiter.Task.Wait(lifetime.ConnectionClosed); closedAwaiter.Task.Wait(context.ConnectionClosed);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -15,9 +15,9 @@ using System.Threading.Tasks;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Refs; using FastTunnel.Core.Refs;
namespace FastTunnel.Core.Forwarder.MiddleWare; namespace FastTunnel.Core.Forwarder.Stream;
internal class DuplexPipeStream : Stream internal class DuplexPipeStream : System.IO.Stream
{ {
private readonly PipeReader _input; private readonly PipeReader _input;
private readonly PipeWriter _output; private readonly PipeWriter _output;
@ -96,7 +96,7 @@ internal class DuplexPipeStream : Stream
WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
} }
public override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{ {
return _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).GetAsTask(); return _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).GetAsTask();
} }
@ -143,7 +143,7 @@ internal class DuplexPipeStream : Stream
// buffer.Count is int // buffer.Count is int
var count = (int)Math.Min(readableBuffer.Length, destination.Length); var count = (int)Math.Min(readableBuffer.Length, destination.Length);
readableBuffer = readableBuffer.Slice(0, count); readableBuffer = readableBuffer.Slice(0, count);
Console.WriteLine($"[{this.GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}"); Console.WriteLine($"[{GetHashCode()}读取]{Encoding.UTF8.GetString(readableBuffer)}");
readableBuffer.CopyTo(destination.Span); readableBuffer.CopyTo(destination.Span);
return count; return count;
} }
@ -160,7 +160,7 @@ internal class DuplexPipeStream : Stream
} }
} }
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{ {
return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
} }
@ -170,7 +170,7 @@ internal class DuplexPipeStream : Stream
return TaskToApm.End<int>(asyncResult); return TaskToApm.End<int>(asyncResult);
} }
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{ {
return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
} }

View File

@ -12,7 +12,7 @@ using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace FastTunnel.Core.Forwarder.MiddleWare; namespace FastTunnel.Core.Forwarder.Stream;
internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable internal class SocketDuplexPipe : IDuplexPipe, IAsyncDisposable
{ {

View File

@ -13,7 +13,7 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Exceptions; using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Forwarder.MiddleWare; using FastTunnel.Core.Forwarder.Stream;
using FastTunnel.Core.Models; using FastTunnel.Core.Models;
using FastTunnel.Core.Models.Massage; using FastTunnel.Core.Models.Massage;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;