优化Swap超时

This commit is contained in:
SpringHgui 2021-09-05 14:37:09 +08:00
parent bc23c3a4f0
commit 5cc9c62bf7
4 changed files with 25 additions and 136 deletions

View File

@ -34,7 +34,7 @@
// , 访url http://{SubDomain}.{WebDomain}:{WebProxyPort}/
"SubDomain": "test" // test.test.cc
// Aip
// CNAMEA
// "WWW": [ "www.abc.com", "test111.test.cc" ]
}
],
@ -50,6 +50,11 @@
"LocalIp": "127.0.0.1",
"LocalPort": 3389, // windows3389
"RemotePort": 1274 // 访 ip:1274 window
},
{
"LocalIp": "127.0.0.1",
"LocalPort": 8090, // windows3389
"RemotePort": 1275 // 访 ip:1274 window
}
]
}

View File

@ -60,25 +60,27 @@ namespace FastTunnel.Core.Forwarder
return await OfflinePage(host, context);
}
var msgId = Guid.NewGuid().ToString().Replace("-", "");
try
{
var RequestId = Guid.NewGuid().ToString().Replace("-", "");
// 发送指令给客户端,等待建立隧道
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{RequestId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", cancellation);
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{msgId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", cancellation);
TaskCompletionSource<Stream> tcs = new(cancellation);
tcs.SetTimeOut(10000, () =>
tcs.SetTimeOut(5000, () =>
{
_logger.LogError($"客户端在指定时间内为建立Swap连接 {RequestId}|{host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
_logger.LogError($"[Http]建立Swap超时 {msgId}|{host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
});
_fastTunnelServer.ResponseTasks.TryAdd(RequestId, tcs);
_fastTunnelServer.ResponseTasks.TryAdd(msgId, tcs);
var res = await tcs.Task;
return res;
}
catch (WebSocketException)
{
_fastTunnelServer.ResponseTasks.TryRemove(msgId, out _);
// 通讯异常,返回客户端离线
return await OfflinePage(host, context);
}

View File

@ -31,10 +31,11 @@ namespace FastTunnel.Core.Dispatchers
public async Task DispatchAsync(Socket _socket, WebSocket client)
{
var msgId = Guid.NewGuid().ToString().Replace("-", "");
try
{
await Task.Yield();
var msgid = Guid.NewGuid().ToString().Replace("-", "");
try
{
@ -44,7 +45,7 @@ namespace FastTunnel.Core.Dispatchers
return;
}
await client.SendCmdAsync(MessageType.Forward, $"{msgid}|{_config.LocalIp}:{_config.LocalPort}", CancellationToken.None);
await client.SendCmdAsync(MessageType.Forward, $"{msgId}|{_config.LocalIp}:{_config.LocalPort}", CancellationToken.None);
}
catch (Exception ex)
{
@ -54,7 +55,12 @@ namespace FastTunnel.Core.Dispatchers
}
var tcs = new TaskCompletionSource<Stream>();
_server.ResponseTasks.TryAdd(msgid, tcs);
tcs.SetTimeOut(5000, () =>
{
logger.LogError($"[Forward]建立Swap超时 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
});
_server.ResponseTasks.TryAdd(msgId, tcs);
using var stream1 = await tcs.Task;
using var stream2 = new NetworkStream(_socket, true);
@ -62,7 +68,8 @@ namespace FastTunnel.Core.Dispatchers
}
catch (Exception ex)
{
logger.LogError("Forward Swap Error" + ex.Message);
_server.ResponseTasks.TryRemove(msgId, out _);
logger.LogDebug("Forward Swap Error" + ex.Message);
}
}
}

View File

@ -1,125 +0,0 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Models;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core.Listener
{
public class ForwardListener
{
ILogger _logerr;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
int m_numConnectedSockets;
bool shutdown = false;
ForwardDispatcher _requestDispatcher;
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
public ForwardListener(string ip, int port, ILogger logerr)
{
_logerr = logerr;
this.ListenIp = ip;
this.ListenPort = port;
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
}
public void Start(ForwardDispatcher requestDispatcher)
{
shutdown = false;
_requestDispatcher = requestDispatcher;
listenSocket.Listen();
StartAccept(null);
}
public void Stop()
{
if (shutdown)
return;
try
{
if (listenSocket.Connected)
{
listenSocket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception)
{
}
finally
{
shutdown = true;
listenSocket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logerr.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAcceptAsync(acceptEventArg);
}
}
private async Task ProcessAcceptAsync(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
Interlocked.Increment(ref m_numConnectedSockets);
_logerr.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// 将此客户端交由Dispatcher进行管理
await _requestDispatcher.DispatchAsync(accept);
// Accept the next connection request
StartAccept(e);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAcceptAsync(e);
}
}
}