重写数据交换

This commit is contained in:
SpringHgui 2021-06-17 00:47:15 +08:00
parent 6007a0bc29
commit 43d15ff888
12 changed files with 212 additions and 14 deletions

View File

@ -0,0 +1,169 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core
{
public class AsyncSocketSwapV2
{
private Socket m_sockt1;
private Socket m_sockt2;
bool m_swaping = false;
byte[] m_buffer = new byte[1024];
SocketAsyncEventArgs e1;
SocketAsyncEventArgs e2;
public AsyncSocketSwapV2(Socket sockt1, Socket sockt2)
{
m_sockt1 = sockt1;
m_sockt2 = sockt2;
e1 = new SocketAsyncEventArgs();
e2 = new SocketAsyncEventArgs();
e1.Completed += IO_Completed;
e2.Completed += IO_Completed;
e1.UserToken = new SwapUserToken { Reciver = m_sockt1, Sender = m_sockt2 };
e2.UserToken = new SwapUserToken { Reciver = m_sockt2, Sender = m_sockt1 };
e1.SetBuffer(m_buffer, 0, 512);
e2.SetBuffer(m_buffer, 512, 512);
}
public AsyncSocketSwapV2 BeforeSwap(Action fun)
{
if (m_swaping)
throw new Exception("BeforeSwap must be invoked before StartSwap!");
fun?.Invoke();
return this;
}
public void StartSwapAsync()
{
try
{
Console.WriteLine("StartSwapAsync");
m_swaping = true;
if (!m_sockt1.ReceiveAsync(e1))
{
ProcessReceive(e1);
}
if (!m_sockt2.ReceiveAsync(e2))
{
ProcessReceive(e2);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
ExceptionDispatchInfo.Capture(ex).Throw();
}
}
private void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
private void ProcessReceive(SocketAsyncEventArgs e)
{
var token = e.UserToken as SwapUserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
Console.WriteLine("ProcessReceive:" + e.BytesTransferred);
e.SetBuffer(e.Offset, e.BytesTransferred);
try
{
if (!token.Sender.SendAsync(e))
{
ProcessSend(e);
}
}
catch (Exception)
{
}
}
else
{
// close
CloseSocket(token.Reciver);
}
}
private void ProcessSend(SocketAsyncEventArgs e)
{
var token = e.UserToken as SwapUserToken;
if (e.SocketError == SocketError.Success)
{
Console.WriteLine("ProcessSend:" + e.BytesTransferred);
if (!token.Reciver.ReceiveAsync(e))
{
ProcessReceive(e);
}
}
else
{
CloseSocket(token.Sender);
}
}
private void CloseSocket(Socket socket)
{
try
{
Console.WriteLine("CloseSocket");
try
{
socket.Shutdown(SocketShutdown.Send);
}
catch (Exception) { }
socket.Close();
//try
//{
// m_sockt1.Shutdown(SocketShutdown.Both);
//}
//catch (Exception) { }
//m_sockt1.Close();
//try
//{
// m_sockt2.Shutdown(SocketShutdown.Both);
//}
//catch (Exception) { }
//m_sockt2.Close();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
public class SwapUserToken
{
public Socket Reciver { get; set; }
public Socket Sender { get; set; }
}
}
}

View File

@ -14,7 +14,12 @@ namespace FastTunnel.Core.Client
{
public class FastTunnelServer
{
public ConcurrentDictionary<string, NewRequest> newRequest = new ConcurrentDictionary<string, NewRequest>();
/// <summary>
/// 外部请求,需要定期清理
/// TODO:是否可以实现LRU
/// </summary>
public ConcurrentDictionary<string, NewRequest> RequestTemp = new ConcurrentDictionary<string, NewRequest>();
public ConcurrentDictionary<string, WebInfo> WebList = new ConcurrentDictionary<string, WebInfo>();
public ConcurrentDictionary<int, SSHInfo<SSHHandlerArg>> SSHList = new ConcurrentDictionary<int, SSHInfo<SSHHandlerArg>>();

View File

@ -144,7 +144,7 @@ namespace FastTunnel.Core.Dispatchers
tempBuffer.Read(byteArray, 0, (int)tempBuffer.Length);
tempBuffer.Close();
_fastTunnelServer.newRequest.TryAdd(msgid, new NewRequest
_fastTunnelServer.RequestTemp.TryAdd(msgid, new NewRequest
{
CustomerClient = httpClient,
Buffer = byteArray

View File

@ -86,7 +86,7 @@ namespace FastTunnel.Core.Dispatchers
}
var msgid = Guid.NewGuid().ToString();
_fastTunnelServer.newRequest.TryAdd(msgid, new NewRequest
_fastTunnelServer.RequestTemp.TryAdd(msgid, new NewRequest
{
CustomerClient = token.Socket,
Buffer = token.Recived

View File

@ -22,6 +22,7 @@
</PropertyGroup>
<ItemGroup>
<Compile Remove="AsyncSocketSwap.cs" />
<Compile Remove="Client\SuiDaoServer.cs.BASE.cs" />
<Compile Remove="Client\SuiDaoServer.cs.LOCAL.cs" />
<Compile Remove="Client\SuiDaoServer.cs.REMOTE.cs" />

View File

@ -59,7 +59,7 @@ namespace FastTunnel.Core.Handlers.Client
throw;
}
new AsyncSocketSwap(connecter.Socket, localConnecter.Socket).StartSwapAsync();
new AsyncSocketSwapV2(connecter.Socket, localConnecter.Socket).StartSwapAsync();
}
}
}

View File

@ -20,7 +20,7 @@ namespace FastTunnel.Core.Handlers.Client
var localConnecter_ssh = new Connecter(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort, 5000);
localConnecter_ssh.Connect();
new AsyncSocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwapAsync();
new AsyncSocketSwapV2(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwapAsync();
}
}
}

View File

@ -27,7 +27,7 @@ namespace FastTunnel.Core.Handlers.Server
{
var msgid = Guid.NewGuid().ToString();
_client.Send(new Message<NewSSHRequest> { MessageType = MessageType.S_NewSSH, Content = new NewSSHRequest { MsgId = msgid, SSHConfig = _config } });
_server.newRequest.TryAdd(msgid, new NewRequest
_server.RequestTemp.TryAdd(msgid, new NewRequest
{
CustomerClient = _socket,
});

View File

@ -27,20 +27,36 @@ namespace FastTunnel.Core.Handlers.Server
var SwapMsg = msg.Content.ToObject<SwapMassage>();
NewRequest request;
if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.newRequest.TryGetValue(SwapMsg.msgId, out request))
if (!string.IsNullOrEmpty(SwapMsg.msgId) && server.RequestTemp.TryGetValue(SwapMsg.msgId, out request))
{
server.newRequest.TryRemove(SwapMsg.msgId, out _);
server.RequestTemp.TryRemove(SwapMsg.msgId, out _);
// Join
new AsyncSocketSwap(request.CustomerClient, client)
.BeforeSwap(() => { if (request.Buffer != null) client.Send(request.Buffer); })
new AsyncSocketSwapV2(request.CustomerClient, client)
.BeforeSwap(() =>
{
if (request.Buffer != null) client.Send(request.Buffer);
})
.StartSwapAsync();
}
else
{
// 未找到,关闭连接
_logger.LogError($"未找到请求:{SwapMsg.msgId}");
client.Send(new Message<LogMassage> { MessageType = MessageType.Log, Content = new LogMassage(LogMsgType.Debug, $"未找到请求:{SwapMsg.msgId}") });
try
{
client.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
client.Close();
}
}
}
}

View File

@ -45,7 +45,7 @@ namespace FastTunnel.Core.Listener
_heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(1000, 10);
server = new Server.Server(1000, 1024);
}
public void Start(int backlog = 100)

View File

@ -36,7 +36,7 @@ namespace FastTunnel.Core.Listener
this.ListenIp = ip;
this.ListenPort = port;
server = new Server.Server(1000, 10);
server = new Server.Server(1000, 1024);
}
private void OnOffLine(Socket socket)

View File

@ -148,6 +148,7 @@ namespace FastTunnel.Core.Server
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).MassgeTemp = null;
Console.WriteLine("ReceiveAsync");
// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
@ -164,6 +165,7 @@ namespace FastTunnel.Core.Server
// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
Console.WriteLine("IO_Completed");
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
{
@ -184,6 +186,7 @@ namespace FastTunnel.Core.Server
//
private void ProcessReceive(SocketAsyncEventArgs e)
{
Console.WriteLine("ProcessReceive");
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
@ -227,7 +230,7 @@ namespace FastTunnel.Core.Server
else
{
// ÊÍ·Å×ÊÔ´
m_readWritePool.Push(e);
release(e);
return;
}
}
@ -286,6 +289,11 @@ namespace FastTunnel.Core.Server
catch (Exception) { }
token.Socket.Close();
release(e);
}
private void release(SocketAsyncEventArgs e)
{
// decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref m_numConnectedSockets);
@ -293,7 +301,6 @@ namespace FastTunnel.Core.Server
m_readWritePool.Push(e);
m_maxNumberAcceptedClients.Release();
// Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
}
}
}