This commit is contained in:
SpringHgui 2021-08-07 14:51:37 +08:00
parent 0996221687
commit e07bcf171e
13 changed files with 331 additions and 81 deletions

View File

@ -2,7 +2,7 @@
"Logging": {
"LogLevel": {
// Trace Debug Information Warning Error
"Default": "Trace",
"Default": "Debug",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}

View File

@ -87,7 +87,7 @@ namespace FastTunnel.Core.Client
await socket.ConnectAsync(
new Uri($"ws://{ClientConfig.Server.ServerAddr}:{ClientConfig.Server.ServerPort}"), cancellationToken);
_logger.LogInformation("连接成功");
_logger.LogDebug("连接服务端成功");
}
catch (Exception)
{

View File

@ -14,7 +14,7 @@ namespace FastTunnel.Core.Extensions
{
public static async Task SendCmdAsync(this WebSocket socket, MessageType type, string content, CancellationToken cancellationToken)
{
var buffer = Encoding.UTF8.GetBytes((char)type + content + "\n");
var buffer = Encoding.UTF8.GetBytes($"{(char)type}{content}\n");
if (type != MessageType.LogIn && buffer.Length > 128)
throw new ArgumentOutOfRangeException(nameof(content));

View File

@ -35,12 +35,10 @@ namespace FastTunnel.Core.Forwarder
private async ValueTask<Stream> ConnectCallback(SocketsHttpConnectionContext context, CancellationToken cancellationToken)
{
var host = context.InitialRequestMessage.RequestUri.Host;
_logger.LogDebug($"ConnectCallback start:{host} {context.GetHashCode()}");
try
{
var res = await proxyAsync(host, cancellationToken);
_logger.LogDebug($"ConnectCallback successfully:{host} {context.GetHashCode()}");
return res;
}
catch (Exception ex)
@ -61,7 +59,6 @@ namespace FastTunnel.Core.Forwarder
try
{
var RequestId = Guid.NewGuid().ToString().Replace("-", "");
_logger.LogInformation($"[发送swap指令]:{RequestId}");
// 发送指令给客户端,等待建立隧道
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{RequestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", cancellation);
@ -71,7 +68,6 @@ namespace FastTunnel.Core.Forwarder
_fastTunnelServer.ResponseTasks.TryAdd(RequestId, task);
var res = await task.Task;
_logger.LogInformation($"[收到swap指令]:{RequestId}");
return res;
}
catch (Exception ex)

View File

@ -56,7 +56,6 @@ namespace Yarp.Sample
public void AddWeb(string hostName)
{
Console.WriteLine($"新增hostName{hostName}");
var oldConfig = _config;
var newRoutes = oldConfig.Routes.ToList();

View File

@ -23,7 +23,7 @@ namespace FastTunnel.Core.MiddleWares
TunnelClient tunnelClient;
public FastTunnelClientHandler(
ILogger<FastTunnelClientHandler> logger,
ILogger<FastTunnelClientHandler> logger,
FastTunnelServer fastTunnelServer, TunnelClient tunnelClient)
{
this.logger = logger;
@ -49,7 +49,6 @@ namespace FastTunnel.Core.MiddleWares
using var webSocket = await context.WebSockets.AcceptWebSocketAsync();
tunnelClient.SetSocket(webSocket);
this.logger.LogInformation($"客户端连接成功");
try
{
@ -59,8 +58,6 @@ namespace FastTunnel.Core.MiddleWares
{
logger.LogError(ex, "通信异常");
}
this.logger.LogInformation($"客户端断开连接");
}
}
}

View File

@ -46,7 +46,6 @@ namespace FastTunnel.Core.Forwarder.MiddleWare
return;
}
logger.LogInformation($"Swap Set {requestId}");
using var reverseConnection = new WebSocketStream(lifetime, transport);
responseAwaiter.TrySetResult(reverseConnection);
@ -54,7 +53,6 @@ namespace FastTunnel.Core.Forwarder.MiddleWare
lifetime.ConnectionClosed.Register((task) => { (task as TaskCompletionSource<object>).SetResult(null); }, closedAwaiter);
await closedAwaiter.Task;
logger.LogInformation($"Swap close {requestId}");
}
}
}

View File

@ -2,6 +2,7 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models;
using FastTunnel.Core.Sockets;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.Extensions.Logging;
using System;
@ -30,12 +31,14 @@ namespace FastTunnel.Core.Dispatchers
_config = config;
}
public async void DispatchAsync(Socket _socket)
public async Task DispatchAsync(Socket _socket)
{
try
{
await Task.Yield();
var msgid = Guid.NewGuid().ToString();
await _client.SendCmdAsync(MessageType.Forward, $"{msgid}|{_config.LocalIp }:{_config.LocalPort}", CancellationToken.None);
await _client.SendCmdAsync(MessageType.Forward, $"{msgid}|{_config.LocalIp}:{_config.LocalPort}", CancellationToken.None);
var tcs = new TaskCompletionSource<Stream>();
_server.ResponseTasks.TryAdd(msgid, tcs);

View File

@ -34,8 +34,6 @@ namespace FastTunnel.Core.Handlers.Client
var requestId = msgs[0];
var address = msgs[1];
_logger.LogDebug($"Swap start {requestId}");
await Task.Yield();
try
@ -47,7 +45,6 @@ namespace FastTunnel.Core.Handlers.Client
var taskY = localStream.CopyToAsync(serverStream, cancellationToken);
await Task.WhenAny(taskX, taskY);
_logger.LogDebug($"Swap success {requestId}");
}
catch (Exception ex)
{
@ -57,7 +54,6 @@ namespace FastTunnel.Core.Handlers.Client
private async Task<Stream> createLocal(string requestId, string localhost, CancellationToken cancellationToken)
{
_logger.LogDebug($"连接本地成功 {requestId}");
var localConnecter = new DnsSocket(localhost.Split(":")[0], int.Parse(localhost.Split(":")[1]));
await localConnecter.ConnectAsync();
@ -69,9 +65,8 @@ namespace FastTunnel.Core.Handlers.Client
var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort);
await connecter.ConnectAsync();
_logger.LogDebug($"连接server成功 {requestId}");
Stream serverConn = new NetworkStream(connecter.Socket, ownsSocket: true);
var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\n\r\n";
var reverse = $"PROXY /{requestId} HTTP/1.1\r\nHost: {cleint.Server.ServerAddr}:{cleint.Server.ServerPort}\r\nConnection: keep-alive\r\n\r\n";
var requestMsg = Encoding.ASCII.GetBytes(reverse);
await serverConn.WriteAsync(requestMsg, cancellationToken);

View File

@ -0,0 +1,125 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Models;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Listener
{
public class ForwardListener
{
ILogger _logerr;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
int m_numConnectedSockets;
bool shutdown = false;
ForwardDispatcher _requestDispatcher;
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
public ForwardListener(string ip, int port, ILogger logerr)
{
_logerr = logerr;
this.ListenIp = ip;
this.ListenPort = port;
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
}
public void Start(ForwardDispatcher requestDispatcher)
{
shutdown = false;
_requestDispatcher = requestDispatcher;
listenSocket.Listen();
StartAccept(null);
}
public void Stop()
{
if (shutdown)
return;
try
{
if (listenSocket.Connected)
{
listenSocket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception)
{
}
finally
{
shutdown = true;
listenSocket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAcceptAsync(acceptEventArg);
}
}
private async Task ProcessAcceptAsync(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// 将此客户端交由Dispatcher进行管理
await _requestDispatcher.DispatchAsync(accept);
// Accept the next connection request
StartAccept(e);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAcceptAsync(e);
}
}
}

View File

@ -50,6 +50,55 @@ namespace FastTunnel.Core.Listener
StartAccept(null);
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAcceptAsync(acceptEventArg);
}
}
private void ProcessAcceptAsync(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// 将此客户端交由Dispatcher进行管理
_requestDispatcher.DispatchAsync(accept);
// Accept the next connection request
StartAccept(e);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAcceptAsync(e);
}
public void Stop()
{
if (shutdown)
@ -73,60 +122,5 @@ namespace FastTunnel.Core.Listener
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
try
{
// 将此客户端交由Dispatcher进行管理
_requestDispatcher.DispatchAsync(accept);
}
catch (Exception ex)
{
_logerr.LogError(ex, "RequestDispatcher Fail");
}
// Accept the next connection request
StartAccept(e);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
}
}

View File

@ -14,12 +14,12 @@ namespace FastTunnel.Core.Sockets
private string _host;
private int _port;
public Socket Socket { get; set; }
public Socket Socket { get; }
public DnsSocket(string v1, int v2)
public DnsSocket(string host, int port)
{
this._host = v1;
this._port = v2;
this._host = host;
this._port = port;
Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
Socket.NoDelay = true;

View File

@ -0,0 +1,143 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Utility.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Sockets
{
public class SocketSwap
{
private readonly Socket m_sockt1;
private readonly Socket m_sockt2;
private readonly string m_msgId = null;
private readonly ILogger m_logger;
private bool swapeStarted = false;
private class Channel
{
public Socket Send { get; set; }
public Socket Receive { get; set; }
}
public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId)
{
//sockt1.NoDelay = true;
//sockt2.NoDelay = true;
m_sockt1 = sockt1;
m_sockt2 = sockt2;
m_msgId = msgId;
m_logger = logger;
}
public void StartSwap()
{
m_logger?.LogDebug($"[StartSwapStart] {m_msgId}");
swapeStarted = true;
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = m_sockt1,
Receive = m_sockt2
});
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = m_sockt2,
Receive = m_sockt1
});
m_logger?.LogDebug($"[StartSwapEnd] {m_msgId}");
}
private void swapCallback(object state)
{
m_logger?.LogDebug($"swapCallback {m_msgId}");
var chanel = state as Channel;
byte[] result = new byte[512];
while (true)
{
int num;
try
{
try
{
num = chanel.Receive.Receive(result, 0, result.Length, SocketFlags.None);
}
catch (Exception)
{
closeSocket("Revice Fail");
break;
}
if (num == 0)
{
closeSocket("Normal Close");
break;
}
try
{
chanel.Send.Send(result, 0, num, SocketFlags.None);
}
catch (Exception)
{
closeSocket("Send Fail");
break;
}
}
catch (Exception ex)
{
m_logger.LogCritical(ex, "致命异常");
break;
}
}
if (m_msgId.Contains("_"))
{
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(m_msgId.Split('_')[0]);
m_logger?.LogDebug($"endSwap {m_msgId} 交互时常:{interval}ms");
}
}
private void closeSocket(string msg)
{
m_logger.LogDebug($"【closeSocket】{msg}");
try
{
m_sockt1.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt1.Close();
}
try
{
m_sockt2.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt2.Close();
}
}
}
}