修正在线数不准确的bug等

This commit is contained in:
ioxygen 2021-09-29 22:02:26 +08:00
parent b39af1435b
commit 3f43810ef3
6 changed files with 80 additions and 32 deletions

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Exceptions
{
public class SocketClosedException : Exception
{
public SocketClosedException(string msg) : base(msg)
{
}
}
}

View File

@ -1,4 +1,5 @@
using FastTunnel.Core.Models;
using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Models;
using System;
using System.Collections.Generic;
using System.Linq;
@ -14,6 +15,11 @@ namespace FastTunnel.Core.Extensions
{
public static async Task SendCmdAsync(this WebSocket socket, MessageType type, string content, CancellationToken cancellationToken)
{
if (socket.State == WebSocketState.Closed || socket.State == WebSocketState.Aborted)
{
throw new SocketClosedException(socket.State.ToString());
}
var buffer = Encoding.UTF8.GetBytes($"{(char)type}{content}\n");
if (type != MessageType.LogIn && buffer.Length > FastTunnelConst.MAX_CMD_LENGTH)
throw new ArgumentOutOfRangeException(nameof(content));

View File

@ -46,7 +46,6 @@ namespace FastTunnel.Core.Forwarder
}
catch (Exception ex)
{
this._logger.LogError(ex, "代理出现异常");
throw;
}
}
@ -62,30 +61,41 @@ namespace FastTunnel.Core.Forwarder
var msgId = Guid.NewGuid().ToString().Replace("-", "");
TaskCompletionSource<Stream> tcs = new(cancellation);
_logger.LogDebug($"[Http]Swap开始 {msgId}|{host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
tcs.SetTimeOut(20000, () =>
{
_logger.LogError($"[Http]建立Swap超时 {msgId}");
});
_fastTunnelServer.ResponseTasks.TryAdd(msgId, tcs);
try
{
// 发送指令给客户端,等待建立隧道
await web.Socket.SendCmdAsync(MessageType.SwapMsg, $"{msgId}|{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}", cancellation);
TaskCompletionSource<Stream> tcs = new(cancellation);
tcs.SetTimeOut(5000, () =>
{
_logger.LogError($"[Http]建立Swap超时 {msgId}|{host}=>{web.WebConfig.LocalIp}:{web.WebConfig.LocalPort}");
});
_fastTunnelServer.ResponseTasks.TryAdd(msgId, tcs);
var res = await tcs.Task;
_logger.LogDebug($"[Http]Swap OK {msgId}");
return res;
}
catch (WebSocketException)
{
tcs.TrySetCanceled();
_fastTunnelServer.ResponseTasks.TryRemove(msgId, out _);
// 通讯异常,返回客户端离线
return await OfflinePage(host, context);
}
catch (Exception)
{
_fastTunnelServer.ResponseTasks.TryRemove(msgId, out _);
throw;
}
}
private async ValueTask<Stream> OfflinePage(string host, SocketsHttpConnectionContext context)
{
var bytes = Encoding.UTF8.GetBytes(

View File

@ -67,14 +67,21 @@ namespace FastTunnel.Core.MiddleWares
Interlocked.Increment(ref fastTunnelServer.ConnectedClientCount);
logger.LogInformation($"客户端连接 {context.TraceIdentifier}:{context.Connection.RemoteIpAddress} 当前在线数:{fastTunnelServer.ConnectedClientCount}");
await tunnelClient.ReviceAsync(CancellationToken.None);
logOut(context);
}
catch (Exception)
{
Interlocked.Decrement(ref fastTunnelServer.ConnectedClientCount);
logger.LogInformation($"客户端关闭 {context.TraceIdentifier}:{context.Connection.RemoteIpAddress} 当前在线数:{fastTunnelServer.ConnectedClientCount}");
logOut(context);
}
}
private void logOut(HttpContext context)
{
Interlocked.Decrement(ref fastTunnelServer.ConnectedClientCount);
logger.LogInformation($"客户端关闭 {context.TraceIdentifier}:{context.Connection.RemoteIpAddress} 当前在线数:{fastTunnelServer.ConnectedClientCount}");
}
private static async Task Close(WebSocket webSocket, string reason)
{
await webSocket.SendCmdAsync(MessageType.Log, reason, CancellationToken.None);

View File

@ -1,5 +1,6 @@
using FastTunnel.Core.Client;
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Exceptions;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models;
using FastTunnel.Core.Sockets;
@ -36,40 +37,47 @@ namespace FastTunnel.Core.Dispatchers
try
{
await Task.Yield();
try
{
if (client.State == WebSocketState.Aborted)
{
logger.LogError("客户端已离线");
return;
}
await client.SendCmdAsync(MessageType.Forward, $"{msgId}|{_config.LocalIp}:{_config.LocalPort}", CancellationToken.None);
}
catch (Exception ex)
{
// 客户端已掉线或网络不稳定
logger.LogError(ex);
return;
}
logger.LogDebug($"[Forward]Swap开始 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
var tcs = new TaskCompletionSource<Stream>();
tcs.SetTimeOut(5000, () =>
tcs.SetTimeOut(20000, () =>
{
logger.LogError($"[Forward]建立Swap超时 {msgId}|{_config.RemotePort}=>{_config.LocalIp}:{_config.LocalPort}");
logger.LogError($"[Forward]建立Swap超时 {msgId}");
});
_server.ResponseTasks.TryAdd(msgId, tcs);
try
{
await client.SendCmdAsync(MessageType.Forward, $"{msgId}|{_config.LocalIp}:{_config.LocalPort}", CancellationToken.None);
}
catch (SocketClosedException sex)
{
// TODO:客户端已掉线,但是没有移除对端口的监听
logger.LogError($"[Forward]Swap 客户端已离线 {sex.Message}");
tcs.TrySetCanceled();
_server.ResponseTasks.TryRemove(msgId, out _);
return;
}
catch (Exception ex)
{
// 网络不稳定
logger.LogError(ex, $"[Forward]Swap Exception");
tcs.TrySetCanceled();
_server.ResponseTasks.TryRemove(msgId, out _);
return;
}
using var stream1 = await tcs.Task;
using var stream2 = new NetworkStream(_socket, true);
await Task.WhenAll(stream1.CopyToAsync(stream2), stream2.CopyToAsync(stream1));
logger.LogDebug($"[Forward]Swap OK {msgId}");
}
catch (Exception ex)
{
_server.ResponseTasks.TryRemove(msgId, out _);
logger.LogDebug("Forward Swap Error" + ex.Message);
logger.LogDebug($"[Forward]Swap Error {msgId}" + ex.Message);
}
}
}

View File

@ -35,6 +35,8 @@ namespace FastTunnel.Core.Handlers.Server
bool hasTunnel = false;
await client.SendCmdAsync(MessageType.Log, $"穿透协议 | 映射关系(公网=>内网)", CancellationToken.None);
Thread.Sleep(300);
if (requet.Webs != null && requet.Webs.Count() > 0)
{
hasTunnel = true;