性能优化

This commit is contained in:
SpringHgui 2021-07-09 00:32:51 +08:00
parent 460682dde5
commit 3a7c5d4512
19 changed files with 345 additions and 109 deletions

View File

@ -2,7 +2,7 @@
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
// Trace Debug Information Warning Error // Trace Debug Information Warning Error
"Default": "Information", "Default": "Trace",
"Microsoft": "Warning", "Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information" "Microsoft.Hosting.Lifetime": "Information"
} }
@ -10,8 +10,8 @@
"ClientSettings": { "ClientSettings": {
"Server": { "Server": {
// ip/ // ip/
"ServerAddr": "test.cc", //"ServerAddr": "my.com",
//"ServerAddr": "127.0.0.1", "ServerAddr": "127.0.0.1",
// //
"ServerPort": 1271 "ServerPort": 1271
}, },
@ -21,13 +21,18 @@
"LocalIp": "127.0.0.1", "LocalIp": "127.0.0.1",
// //
"LocalPort": 9529, "LocalPort": 8090,
// , 访url http://{SubDomain}.{WebDomain}:{WebProxyPort}/ // , 访url http://{SubDomain}.{WebDomain}:{WebProxyPort}/
"SubDomain": "test" // test.test.cc "SubDomain": "test" // test.test.cc
// Aip // Aip
// "WWW": [ "www.abc.com", "test111.test.cc" ] // "WWW": [ "www.abc.com", "test111.test.cc" ]
},
{
"LocalIp": "127.0.0.1",
"LocalPort": 8091,
"SubDomain": "test1"
} }
], ],
@ -38,9 +43,14 @@
*/ */
"SSH": [ "SSH": [
{ {
"LocalIp": "test.cc", "LocalIp": "127.0.0.1",
"LocalPort": 8090, "LocalPort": 8090,
"RemotePort": 9999 "RemotePort": 7090
},
{
"LocalIp": "127.0.0.1",
"LocalPort": 8091,
"RemotePort": 7091
}, },
{ {
"LocalIp": "192.168.0.91", "LocalIp": "192.168.0.91",

View File

@ -191,10 +191,92 @@ namespace FastTunnel.Core.Client
// 心跳开始 // 心跳开始
timer_heart.Start(); timer_heart.Start();
await new PipeHepler(_client, ProceccLine).ProcessLinesAsync(); var th = new Thread(ReceiveServer);
th.Start(_client);
//await new PipeHepler(_client, ProceccLine).ProcessLinesAsync();
}
private void ReceiveServer(object obj)
{
var client = obj as Socket;
byte[] buffer = new byte[1024];
string lastBuffer = string.Empty;
int n = 0;
while (true)
{
try
{
n = client.Receive(buffer);
if (n == 0)
{
client.Shutdown(SocketShutdown.Both);
break;
}
}
/// <see cref="https://docs.microsoft.com/zh-cn/windows/win32/winsock/windows-sockets-error-codes-2"/>
catch (SocketException socketEx)
{
// Connection timed out.
if (socketEx.ErrorCode == 10060)
{
_logger.LogInformation("Connection timed out");
}
else
{
_logger.LogError(socketEx);
}
break;
}
catch (Exception ex)
{
_logger.LogError(ex);
break;
}
string words = Encoding.UTF8.GetString(buffer, 0, n);
if (!string.IsNullOrEmpty(lastBuffer))
{
words = lastBuffer + words;
lastBuffer = null;
}
var msgs = words.Split("\n");
_logger.LogDebug("recive from server:" + words);
try
{
foreach (var item in msgs)
{
if (string.IsNullOrEmpty(item))
continue;
if (item.EndsWith("}"))
{
HandleServerRequest(item);
}
else
{
lastBuffer = item;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
continue;
}
}
_logger.LogInformation("stop receive from server");
} }
private bool ProceccLine(Socket socket, byte[] line) private bool ProceccLine(Socket socket, byte[] line)
{
Task.Run(() =>
{ {
try try
{ {
@ -205,6 +287,7 @@ namespace FastTunnel.Core.Client
{ {
_logger.LogError(ex); _logger.LogError(ex);
} }
});
return true; return true;
} }
@ -212,6 +295,11 @@ namespace FastTunnel.Core.Client
private void HandleServerRequest(string words) private void HandleServerRequest(string words)
{ {
var Msg = JsonConvert.DeserializeObject<Message<JObject>>(words); var Msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
if (Msg.MessageType!= MessageType.Heart)
{
_logger.LogDebug($"HandleServerRequest {words}");
}
IClientHandler handler; IClientHandler handler;
switch (Msg.MessageType) switch (Msg.MessageType)
{ {

View File

@ -12,6 +12,7 @@ using System.Text.RegularExpressions;
using System.Net.Http; using System.Net.Http;
using System.IO; using System.IO;
using FastTunnel.Core.Server; using FastTunnel.Core.Server;
using System.Diagnostics;
namespace FastTunnel.Core.Dispatchers namespace FastTunnel.Core.Dispatchers
{ {
@ -32,7 +33,10 @@ namespace FastTunnel.Core.Dispatchers
public void Dispatch(AsyncUserToken token, string words) public void Dispatch(AsyncUserToken token, string words)
{ {
_logger.LogDebug("=======Dispatch HTTP========"); _logger.LogDebug($"=======Dispatch HTTP {token.RequestId}========");
Stopwatch sw = new Stopwatch();
sw.Start();
// 1.检查白名单 // 1.检查白名单
try try
@ -54,8 +58,6 @@ namespace FastTunnel.Core.Dispatchers
_logger.LogError(ex); _logger.LogError(ex);
} }
_logger.LogDebug("=======Dispatch Matches========");
string Host; string Host;
MatchCollection collection = Regex.Matches(words, pattern); MatchCollection collection = Regex.Matches(words, pattern);
if (collection.Count == 0) if (collection.Count == 0)
@ -74,7 +76,7 @@ namespace FastTunnel.Core.Dispatchers
_logger.LogDebug(Host.Replace("\r", "")); _logger.LogDebug(Host.Replace("\r", ""));
var domain = Host.Split(":")[1].Trim(); var domain = Host.Split(":")[1].Trim();
_logger.LogDebug($"=======Dispatch domain:{domain}========"); _logger.LogDebug($"=======Dispatch domain:{domain} {token.RequestId} ========");
// 判断是否为ip // 判断是否为ip
if (IsIpDomian(domain)) if (IsIpDomian(domain))
@ -87,34 +89,36 @@ namespace FastTunnel.Core.Dispatchers
WebInfo web; WebInfo web;
if (!_fastTunnelServer.WebList.TryGetValue(domain, out web)) if (!_fastTunnelServer.WebList.TryGetValue(domain, out web))
{ {
_logger.LogDebug($"=======Dispatch 未登录========"); _logger.LogDebug($"=======站点未登录 {token.RequestId}========");
HandlerClientNotOnLine(token.Socket, domain); HandlerClientNotOnLine(token.Socket, domain);
return; return;
} }
_logger.LogDebug($"=======Dispatch 已找到========"); _logger.LogDebug($"=======找到映射的站点 {token.RequestId}========");
var msgid = Guid.NewGuid().ToString(); _fastTunnelServer.RequestTemp.TryAdd(token.RequestId, new NewRequest
_fastTunnelServer.RequestTemp.TryAdd(msgid, new NewRequest
{ {
CustomerClient = token.Socket, CustomerClient = token.Socket,
Buffer = token.Recived Buffer = token.Recived
}); });
_logger.LogDebug($"=======Dispatch 发送msg========");
try try
{ {
_logger.LogDebug($"=======OK========"); sw.Stop();
web.Socket.SendCmd(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = msgid, WebConfig = web.WebConfig } }); _logger.LogDebug($"[寻找路由耗时]{sw.ElapsedMilliseconds}ms");
_logger.LogDebug($"=======Dispatch OK========"); sw.Restart();
web.Socket.SendCmd(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = token.RequestId, WebConfig = web.WebConfig } });
sw.Stop();
_logger.LogDebug($"[发送NewCustomer指令耗时]{sw.ElapsedMilliseconds}");
_logger.LogDebug($"=======发送请求成功 {token.RequestId}========");
} }
catch (Exception) catch (Exception)
{ {
_logger.LogDebug($"=======客户端不在线 {token.RequestId}========");
HandlerClientNotOnLine(token.Socket, domain); HandlerClientNotOnLine(token.Socket, domain);
_logger.LogDebug($"=======Dispatch 移除========");
// 移除 // 移除
_fastTunnelServer.WebList.TryRemove(domain, out _); _fastTunnelServer.WebList.TryRemove(domain, out _);
} }

View File

@ -18,7 +18,7 @@
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<PackageTags>FastTunnel.Core</PackageTags> <PackageTags>FastTunnel.Core</PackageTags>
<PackageReleaseNotes>FastTunnel.Core</PackageReleaseNotes> <PackageReleaseNotes>FastTunnel.Core</PackageReleaseNotes>
<Version>1.1.2</Version> <Version>1.1.3</Version>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -10,24 +10,41 @@ using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FastTunnel.Core.Sockets; using FastTunnel.Core.Sockets;
using Microsoft.Extensions.Logging;
using FastTunnel.Core.Utility.Extensions;
namespace FastTunnel.Core.Handlers.Client namespace FastTunnel.Core.Handlers.Client
{ {
public class HttpRequestHandler : IClientHandler public class HttpRequestHandler : IClientHandler
{ {
ILogger<HttpRequestHandler> _logger;
public HttpRequestHandler(ILogger<HttpRequestHandler> logger)
{
_logger = logger;
}
public void HandlerMsg(FastTunnelClient cleint, Message<JObject> Msg) public void HandlerMsg(FastTunnelClient cleint, Message<JObject> Msg)
{ {
var request = Msg.Content.ToObject<NewCustomerMassage>(); var request = Msg.Content.ToObject<NewCustomerMassage>();
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(request.MsgId.Split('_')[0]);
_logger.LogDebug($"Start SwapMassage {request.MsgId} 延迟时间:{interval}ms");
var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort); var connecter = new DnsSocket(cleint.Server.ServerAddr, cleint.Server.ServerPort);
connecter.Connect(); connecter.Connect();
connecter.Send(new Message<SwapMassage> { MessageType = MessageType.C_SwapMsg, Content = new SwapMassage(request.MsgId) }); connecter.Send(new Message<SwapMassage> { MessageType = MessageType.C_SwapMsg, Content = new SwapMassage(request.MsgId) });
_logger.LogDebug($"连接server成功 {request.MsgId}");
var localConnecter = new DnsSocket(request.WebConfig.LocalIp, request.WebConfig.LocalPort); var localConnecter = new DnsSocket(request.WebConfig.LocalIp, request.WebConfig.LocalPort);
try try
{ {
localConnecter.Connect(); localConnecter.Connect();
_logger.LogDebug($"连接本地成功 {request.MsgId}");
new SocketSwap(connecter.Socket, localConnecter.Socket, _logger, request.MsgId).StartSwap();
} }
catch (SocketException sex) catch (SocketException sex)
{ {
@ -59,8 +76,6 @@ namespace FastTunnel.Core.Handlers.Client
localConnecter.Close(); localConnecter.Close();
throw; throw;
} }
new SocketSwap(connecter.Socket, localConnecter.Socket).StartSwap();
} }
} }
} }

View File

@ -6,11 +6,18 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using FastTunnel.Core.Sockets; using FastTunnel.Core.Sockets;
using Microsoft.Extensions.Logging;
namespace FastTunnel.Core.Handlers.Client namespace FastTunnel.Core.Handlers.Client
{ {
public class NewSSHHandler : IClientHandler public class NewSSHHandler : IClientHandler
{ {
ILogger<NewSSHHandler> _logger;
public NewSSHHandler(ILogger<NewSSHHandler> logger)
{
_logger = logger;
}
public void HandlerMsg(FastTunnelClient cleint, Message<JObject> Msg) public void HandlerMsg(FastTunnelClient cleint, Message<JObject> Msg)
{ {
var request_ssh = Msg.Content.ToObject<NewSSHRequest>(); var request_ssh = Msg.Content.ToObject<NewSSHRequest>();
@ -21,7 +28,7 @@ namespace FastTunnel.Core.Handlers.Client
var localConnecter_ssh = new DnsSocket(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort); var localConnecter_ssh = new DnsSocket(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort);
localConnecter_ssh.Connect(); localConnecter_ssh.Connect();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwap(); new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket, _logger, request_ssh.MsgId).StartSwap();
} }
} }
} }

View File

@ -28,10 +28,14 @@ namespace FastTunnel.Core.Handlers.Server
var SwapMsg = msg.Content.ToObject<SwapMassage>(); var SwapMsg = msg.Content.ToObject<SwapMassage>();
NewRequest request; NewRequest request;
_logger.LogDebug($"响应NewCustomer{SwapMsg.msgId}");
if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request)) if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request))
{ {
server.RequestTemp.TryRemove(SwapMsg.msgId, out _); server.RequestTemp.TryRemove(SwapMsg.msgId, out _);
new SocketSwap(request.CustomerClient, client)
_logger.LogDebug($"SwapMassage{SwapMsg.msgId}");
new SocketSwap(request.CustomerClient, client, _logger, SwapMsg.msgId)
.BeforeSwap(() => .BeforeSwap(() =>
{ {
if (request.Buffer != null) client.Send(request.Buffer); if (request.Buffer != null) client.Send(request.Buffer);

View File

@ -42,10 +42,10 @@ namespace FastTunnel.Core.Listener
_heartHandler = new HeartMessageHandler(); _heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger); _swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(1000, 100); server = new Server.Server(2000, 100, _logger);
} }
public void Start(int backlog = 100) public void Start()
{ {
IPAddress ipa = IPAddress.Parse(ListenIp); IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort); IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);

View File

@ -30,7 +30,7 @@ namespace FastTunnel.Core.Listener
this.ListenIp = ip; this.ListenIp = ip;
this.ListenPort = port; this.ListenPort = port;
server = new Server.Server(1000, 512); server = new Server.Server(500, 512, _logger);
} }
public void Start(IListenerDispatcher requestDispatcher, int backlog = 100) public void Start(IListenerDispatcher requestDispatcher, int backlog = 100)
@ -49,7 +49,6 @@ namespace FastTunnel.Core.Listener
{ {
try try
{ {
Console.WriteLine(words);
_requestDispatcher.Dispatch(token, words); _requestDispatcher.Dispatch(token, words);
return false; return false;
} }

View File

@ -14,5 +14,7 @@ namespace FastTunnel.Core.Server
public string MassgeTemp { get; set; } public string MassgeTemp { get; set; }
public byte[] Recived { get; set; } public byte[] Recived { get; set; }
public string RequestId { get; set; }
} }
} }

View File

@ -40,7 +40,6 @@ namespace FastTunnel.Core.Server
// <returns>true if the buffer was successfully set, else false</returns> // <returns>true if the buffer was successfully set, else false</returns>
public bool SetBuffer(SocketAsyncEventArgs args) public bool SetBuffer(SocketAsyncEventArgs args)
{ {
if (m_freeIndexPool.Count > 0) if (m_freeIndexPool.Count > 0)
{ {
args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize); args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
@ -51,6 +50,7 @@ namespace FastTunnel.Core.Server
{ {
return false; return false;
} }
args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize); args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
m_currentIndex += m_bufferSize; m_currentIndex += m_bufferSize;
} }

View File

@ -13,6 +13,7 @@ namespace FastTunnel.Core.Server
{ {
Socket m_socket; Socket m_socket;
Func<Socket, byte[], bool> processLine; Func<Socket, byte[], bool> processLine;
const int minimumBufferSize = 512;
public PipeHepler(Socket socket, Func<Socket, byte[], bool> processLine) public PipeHepler(Socket socket, Func<Socket, byte[], bool> processLine)
{ {
@ -30,8 +31,6 @@ namespace FastTunnel.Core.Server
private async Task FillPipeAsync(PipeWriter writer) private async Task FillPipeAsync(PipeWriter writer)
{ {
const int minimumBufferSize = 512;
while (true) while (true)
{ {
// Allocate at least 512 bytes from the PipeWriter. // Allocate at least 512 bytes from the PipeWriter.
@ -77,7 +76,7 @@ namespace FastTunnel.Core.Server
if (!processLine(m_socket, line.ToArray())) if (!processLine(m_socket, line.ToArray()))
{ {
// 停止继续监听 // 停止继续监听
break;
} }
} }

View File

@ -3,6 +3,8 @@
// is sent back to the client. The read and echo back to the client pattern // is sent back to the client. The read and echo back to the client pattern
// is continued until the client disconnects. // is continued until the client disconnects.
using FastTunnel.Core.Extensions; using FastTunnel.Core.Extensions;
using FastTunnel.Core.Utility.Extensions;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
@ -28,6 +30,9 @@ namespace FastTunnel.Core.Server
Func<AsyncUserToken, string, bool> m_handller; Func<AsyncUserToken, string, bool> m_handller;
string m_sectionFlag; string m_sectionFlag;
IPEndPoint _localEndPoint;
ILogger _logger;
// Create an uninitialized server instance. // Create an uninitialized server instance.
// To start the server listening for connection requests // To start the server listening for connection requests
@ -35,8 +40,9 @@ namespace FastTunnel.Core.Server
// //
// <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param> // <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
// <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param> // <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
public Server(int numConnections, int receiveBufferSize) public Server(int numConnections, int receiveBufferSize, ILogger logger)
{ {
_logger = logger;
//m_totalBytesRead = 0; //m_totalBytesRead = 0;
m_numConnectedSockets = 0; m_numConnectedSockets = 0;
m_numConnections = numConnections; m_numConnections = numConnections;
@ -47,6 +53,7 @@ namespace FastTunnel.Core.Server
receiveBufferSize); receiveBufferSize);
m_readWritePool = new SocketAsyncEventArgsPool(numConnections); m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections); m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
} }
@ -88,12 +95,13 @@ namespace FastTunnel.Core.Server
{ {
m_handller = handller; m_handller = handller;
m_sectionFlag = sectionFlag; m_sectionFlag = sectionFlag;
_localEndPoint = localEndPoint;
// create the socket which listens for incoming connections // create the socket which listens for incoming connections
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint); listenSocket.Bind(localEndPoint);
// start the server with a listen backlog of 100 connections // start the server with a listen backlog of 100 connections
listenSocket.Listen(100); listenSocket.Listen();
// post accepts on the listening socket // post accepts on the listening socket
StartAccept(null); StartAccept(null);
@ -117,6 +125,7 @@ namespace FastTunnel.Core.Server
} }
m_maxNumberAcceptedClients.WaitOne(); m_maxNumberAcceptedClients.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg); bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent) if (!willRaiseEvent)
{ {
@ -126,7 +135,6 @@ namespace FastTunnel.Core.Server
// This method is the callback method associated with Socket.AcceptAsync // This method is the callback method associated with Socket.AcceptAsync
// operations and is invoked when an accept operation is complete // operations and is invoked when an accept operation is complete
//
void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{ {
ProcessAccept(e); ProcessAccept(e);
@ -135,15 +143,19 @@ namespace FastTunnel.Core.Server
private void ProcessAccept(SocketAsyncEventArgs e) private void ProcessAccept(SocketAsyncEventArgs e)
{ {
Interlocked.Increment(ref m_numConnectedSockets); Interlocked.Increment(ref m_numConnectedSockets);
_logger.LogInformation($"[当前连接数]:{_localEndPoint.Port} | {m_numConnectedSockets}");
// Get the socket for the accepted client connection and put it into the // Get the socket for the accepted client connection and put it into the
//ReadEventArg object user token //ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop(); SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket; var token = readEventArgs.UserToken as AsyncUserToken;
((AsyncUserToken)readEventArgs.UserToken).MassgeTemp = null; token.Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).Recived = null; token.MassgeTemp = null;
token.Recived = null;
token.RequestId = $"{DateTime.Now.GetChinaTicks()}_{Guid.NewGuid().ToString().Replace("-", string.Empty)}";
_logger.LogDebug($"Accept {token.RequestId}");
Console.WriteLine("ReceiveAsync");
// As soon as the client is connected, post a receive to the connection // As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs); bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent) if (!willRaiseEvent)
@ -181,6 +193,7 @@ namespace FastTunnel.Core.Server
private void ProcessReceive(SocketAsyncEventArgs e) private void ProcessReceive(SocketAsyncEventArgs e)
{ {
AsyncUserToken token = (AsyncUserToken)e.UserToken; AsyncUserToken token = (AsyncUserToken)e.UserToken;
_logger.LogDebug($"[ProcessReceive]: {_localEndPoint.Port} | {token.RequestId}");
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{ {
// increment the count of the total bytes receive by the server // increment the count of the total bytes receive by the server
@ -292,6 +305,7 @@ namespace FastTunnel.Core.Server
// decrement the counter keeping track of the total number of clients connected to the server // decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref m_numConnectedSockets); Interlocked.Decrement(ref m_numConnectedSockets);
_logger.LogInformation($"[SocketCount]:{_localEndPoint.Port} | {m_numConnectedSockets}");
// Free the SocketAsyncEventArg so they can be reused by another client // Free the SocketAsyncEventArg so they can be reused by another client
m_readWritePool.Push(e); m_readWritePool.Push(e);

View File

@ -41,7 +41,6 @@ namespace FastTunnel.Core.Services
return Task.CompletedTask; return Task.CompletedTask;
} }
private void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) private void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
{ {
try try

View File

@ -93,7 +93,6 @@ namespace FastTunnel.Core.Sockets
var token = e.UserToken as SwapUserToken; var token = e.UserToken as SwapUserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{ {
Console.WriteLine("ProcessReceive:" + e.BytesTransferred);
e.SetBuffer(e.Offset, 512); e.SetBuffer(e.Offset, 512);
if (!token.Sender.SendAsync(e)) if (!token.Sender.SendAsync(e))
{ {

View File

@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Sockets
{
public class ConnectSocket
{
string m_server;
int m_port;
public ConnectSocket(string server, int port)
{
this.m_port = port;
this.m_server = server;
}
public Socket Connect()
{
Socket s = null;
IPHostEntry hostEntry = null;
// Get host related information.
hostEntry = Dns.GetHostEntry(m_server);
// Loop through the AddressList to obtain the supported AddressFamily. This is to avoid
// an exception that occurs when the host IP Address is not compatible with the address family
// (typical in the IPv6 case).
foreach (IPAddress address in hostEntry.AddressList)
{
IPEndPoint ipe = new IPEndPoint(address, m_port);
Socket tempSocket =
new Socket(ipe.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
tempSocket.Connect(ipe);
if (tempSocket.Connected)
{
s = tempSocket;
break;
}
else
{
continue;
}
}
return s;
}
}
}

View File

@ -1,6 +1,9 @@
using FastTunnel.Core.Dispatchers; using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Utility.Extensions;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
@ -9,9 +12,12 @@ namespace FastTunnel.Core.Sockets
{ {
public class SocketSwap : ISocketSwap public class SocketSwap : ISocketSwap
{ {
private Socket _sockt1; private readonly Socket m_sockt1;
private Socket _sockt2; private readonly Socket m_sockt2;
bool Swaped = false; private readonly string m_msgId = null;
private readonly ILogger m_logger;
private bool Swaped = false;
private class Channel private class Channel
{ {
@ -20,100 +26,116 @@ namespace FastTunnel.Core.Sockets
public Socket Receive { get; set; } public Socket Receive { get; set; }
} }
public SocketSwap(Socket sockt1, Socket sockt2) public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId)
{ {
_sockt1 = sockt1; m_sockt1 = sockt1;
_sockt2 = sockt2; m_sockt2 = sockt2;
m_msgId = msgId;
m_logger = logger;
} }
public void StartSwap() public void StartSwap()
{ {
m_logger?.LogDebug($"StartSwap {m_msgId}");
Swaped = true; Swaped = true;
ThreadPool.QueueUserWorkItem(swapCallback, new Channel ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{ {
Send = _sockt1, Send = m_sockt1,
Receive = _sockt2 Receive = m_sockt2
}); });
ThreadPool.QueueUserWorkItem(swapCallback, new Channel ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{ {
Send = _sockt2, Send = m_sockt2,
Receive = _sockt1 Receive = m_sockt1
}); });
} }
private void swapCallback(object state) private void swapCallback(object state)
{ {
m_logger?.LogDebug($"swapCallback {m_msgId}");
var chanel = state as Channel; var chanel = state as Channel;
byte[] result = new byte[1024]; byte[] result = new byte[512];
while (true) while (true)
{ {
int num;
try try
{ {
if (!chanel.Receive.Connected) try
{
num = chanel.Receive.Receive(result, result.Length, SocketFlags.None);
}
catch (Exception)
{
closeSocket("Revice Fail");
break; break;
int num = chanel.Receive.Receive(result, result.Length, SocketFlags.None); }
if (num == 0) if (num == 0)
{ {
chanel.Receive.Close(); closeSocket("Normal Close");
break;
}
try try
{ {
// Release the socket.//
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
break;
}
if (!chanel.Send.Connected)
break;
// var str = Encoding.UTF8.GetString(result, 0, num);
chanel.Send.Send(result, num, SocketFlags.None); chanel.Send.Send(result, num, SocketFlags.None);
} }
catch (SocketException) catch (Exception)
{ {
// Interrupted function call. 10004 closeSocket("Send Fail");
// An existing connection was forcibly closed by the remote host. 10054
try
{
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
try
{
chanel.Receive.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Receive.Close();
}
break; break;
} }
}
catch (Exception ex) catch (Exception ex)
{ {
Console.Write(ex.ToString()); m_logger.LogCritical(ex, "致命异常");
throw; break;
} }
} }
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(m_msgId.Split('_')[0]);
m_logger?.LogDebug($"endSwap {m_msgId} 交互时常:{interval}ms");
}
private void closeSocket(string msg)
{
m_logger.LogDebug($"【closeSocket】{msg}");
try
{
m_sockt1.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt1.Close();
}
try
{
m_sockt2.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt2.Close();
}
} }
public ISocketSwap BeforeSwap(Action fun) public ISocketSwap BeforeSwap(Action fun)
{ {
m_logger?.LogDebug($"BeforeSwap {m_msgId}");
if (Swaped) if (Swaped)
{ {
throw new Exception("BeforeSwap must be invoked before StartSwap!"); throw new Exception("BeforeSwap must be invoked before StartSwap!");

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Utility.Extensions
{
public static class DateTimeExtensions
{
public static string GetChinaTicks(this DateTime dateTime)
{
// 北京时间相差8小时
DateTime startTime = TimeZoneInfo.ConvertTime(new DateTime(1970, 1, 1, 8, 0, 0, 0), TimeZoneInfo.Local);
long t = (dateTime.Ticks - startTime.Ticks) / 10000;
return t.ToString();
}
}
}

View File

@ -2,8 +2,8 @@
echo 127.0.0.1 test.cc >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 test.cc >>C:\Windows\System32\drivers\etc\hosts
echo 127.0.0.1 test.test.cc >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 test.test.cc >>C:\Windows\System32\drivers\etc\hosts
echo 127.0.0.1 test1.test.cc >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 test1.test.cc >>C:\Windows\System32\drivers\etc\hosts
echo 127.0.0.1 test111.test.cc >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 test2.test.cc >>C:\Windows\System32\drivers\etc\hosts
echo 127.0.0.1 test111.test.cc >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 test3.test.cc >>C:\Windows\System32\drivers\etc\hosts
echo 127.0.0.1 www.abc.com >>C:\Windows\System32\drivers\etc\hosts echo 127.0.0.1 www.abc.com >>C:\Windows\System32\drivers\etc\hosts
pause pause