重构一些旧的实现

This commit is contained in:
SpringHgui 2020-09-27 23:59:21 +08:00
parent 926b16fa36
commit 1d89fb7a32
8 changed files with 149 additions and 177 deletions

View File

@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace FastTunnel.Core
{
public class AsyncSocketSwap
{
private Socket m_sockt1;
private Socket m_sockt2;
bool m_swaping = false;
private class Channel
{
public Socket Send { get; set; }
public Socket Receive { get; set; }
}
public AsyncSocketSwap(Socket sockt1, Socket sockt2)
{
m_sockt1 = sockt1;
m_sockt2 = sockt2;
}
public void StartSwap()
{
m_swaping = true;
var rcv1 = new DataReciver(m_sockt1);
rcv1.OnComplete += Rcv1_OnComplete;
rcv1.ReciveOne();
var rcv2 = new DataReciver(m_sockt2);
rcv2.OnComplete += Rcv2_OnComplete;
rcv2.ReciveOne();
}
private void Rcv1_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
m_sockt2.Send(buffer, index, count, SocketFlags.None);
send.ReciveOne();
}
private void Rcv2_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
m_sockt1.Send(buffer, index, count, SocketFlags.None);
send.ReciveOne();
}
internal AsyncSocketSwap BeforeSwap(Action fun)
{
if (m_swaping)
{
throw new Exception("BeforeSwap must be invoked before StartSwap!");
}
fun?.Invoke();
return this;
}
}
}

View File

@ -1,12 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
namespace FastTunnel.Core
{
public class AsyncUserToken
{
public Socket Socket { get; set; }
}
}

View File

@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
namespace FastTunnel.Core
{
public delegate void OnCompleteHandler(DataReciver send, byte[] buffer, int index, int count);
public delegate void OnError(DataReciver send, SocketAsyncEventArgs e);
public class DataReciver
{
private Socket m_client;
public event OnCompleteHandler OnComplete;
public event OnError OnError;
byte[] buffer = new byte[1024 * 1024];
SocketAsyncEventArgs rcv_event;
public Socket Socket => m_client;
public DataReciver(Socket client)
{
this.m_client = client;
rcv_event = new SocketAsyncEventArgs();
rcv_event.Completed += Rcv_event_Completed;
rcv_event.SetBuffer(buffer);
}
public void ReciveOne()
{
var willRaise = m_client.ReceiveAsync(rcv_event);
if (!willRaise)
{
Process(rcv_event);
}
}
private void Rcv_event_Completed(object sender, SocketAsyncEventArgs e)
{
Process(e);
}
private void Process(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
if (e.BytesTransferred == 0)
{
}
else
{
OnComplete?.Invoke(this, buffer, e.Offset, e.BytesTransferred);
}
}
else
{
OnError?.Invoke(this, e);
}
}
}
}

View File

@ -58,7 +58,7 @@ namespace FastTunnel.Core.Handlers.Client
throw;
}
new SocketSwap(connecter.Socket, localConnecter.Socket).StartSwap();
new AsyncSocketSwap(connecter.Socket, localConnecter.Socket).StartSwap();
}
}
}

View File

@ -20,7 +20,7 @@ namespace FastTunnel.Core.Handlers.Client
var localConnecter_ssh = new Connecter(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort, 5000);
localConnecter_ssh.Connect();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwap();
new AsyncSocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwap();
}
}
}

View File

@ -32,45 +32,25 @@ namespace FastTunnel.Core.Handlers.Server
_swapMsgHandler = new SwapMessageHandler(logger);
}
byte[] buffer = new byte[1024 * 1024];
string temp = string.Empty;
public void Dispatch(Socket client)
{
//定义byte数组存放从客户端接收过来的数据
int length;
var reader = new DataReciver(client);
reader.OnComplete += Reader_OnComplete;
reader.OnError += Reader_OnError;
try
{
length = client.Receive(buffer);
if (length == 0)
{
try
{
client.Shutdown(SocketShutdown.Both);
}
finally
{
client.Close();
}
reader.ReciveOne();
}
// 递归结束
return;
}
}
catch (Exception ex)
{
_logger.LogError($"接收客户端异常 -> 退出登录 {ex.Message}");
private void Reader_OnError(DataReciver send, SocketAsyncEventArgs e)
{
//
}
if (client.Connected)
{
client.Close();
}
return;
}
// 将字节转换成字符串
string words = Encoding.UTF8.GetString(buffer, 0, length);
private void Reader_OnComplete(DataReciver reader, byte[] buffer, int offset, int count)
{
var words = Encoding.UTF8.GetString(buffer, offset, count);
words += temp;
temp = string.Empty;
@ -87,12 +67,12 @@ namespace FastTunnel.Core.Handlers.Server
if (firstIndex < 0)
{
temp += words;
Dispatch(client);
reader.ReciveOne();
break;
}
var sub_words = words.Substring(index, firstIndex + 1);
var res = handle(sub_words, client);
var res = handle(sub_words, reader.Socket);
if (res.NeedRecive)
needRecive = true;
@ -104,7 +84,7 @@ namespace FastTunnel.Core.Handlers.Server
if (needRecive)
{
Dispatch(client);
reader.ReciveOne();
}
}
catch (Exception ex)
@ -113,8 +93,8 @@ namespace FastTunnel.Core.Handlers.Server
_logger.LogError($"handle fail msg{words}");
// throw;
client.Send(new Message<LogMassage>() { MessageType = MessageType.Log, Content = new LogMassage(LogMsgType.Error, ex.Message) });
Dispatch(client);
reader.Socket.Send(new Message<LogMassage>() { MessageType = MessageType.Log, Content = new LogMassage(LogMsgType.Error, ex.Message) });
reader.ReciveOne();
}
}
@ -122,6 +102,7 @@ namespace FastTunnel.Core.Handlers.Server
{
Message<JObject> msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
IClientMessageHandler handler = null;
switch (msg.MessageType)
{

View File

@ -32,7 +32,7 @@ namespace FastTunnel.Core.Handlers.Server
// Join
Task.Run(() =>
{
(new SocketSwap(request.CustomerClient, client))
(new AsyncSocketSwap(request.CustomerClient, client))
.BeforeSwap(() => { if (request.Buffer != null) client.Send(request.Buffer); })
.StartSwap();
});

View File

@ -1,125 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace FastTunnel.Core
{
public class SocketSwap
{
private Socket _sockt1;
private Socket _sockt2;
bool Swaped = false;
private class Channel
{
public Socket Send { get; set; }
public Socket Receive { get; set; }
}
public SocketSwap(Socket sockt1, Socket sockt2)
{
_sockt1 = sockt1;
_sockt2 = sockt2;
}
public void StartSwap()
{
Swaped = true;
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = _sockt1,
Receive = _sockt2
});
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = _sockt2,
Receive = _sockt1
});
}
private void swapCallback(object state)
{
var chanel = state as Channel;
byte[] result = new byte[1024];
while (true)
{
try
{
if (!chanel.Receive.Connected)
break;
int num = chanel.Receive.Receive(result, result.Length, SocketFlags.None);
if (num == 0)
{
chanel.Receive.Close();
try
{
// Release the socket.//
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
break;
}
if (!chanel.Send.Connected)
break;
// var str = Encoding.UTF8.GetString(result, 0, num);
chanel.Send.Send(result, num, SocketFlags.None);
}
catch (SocketException)
{
// Interrupted function call. 10004
// An existing connection was forcibly closed by the remote host. 10054
try
{
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
try
{
chanel.Receive.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Receive.Close();
}
break;
}
catch (Exception ex)
{
Console.Write(ex.ToString());
throw;
}
}
}
internal SocketSwap BeforeSwap(Action fun)
{
if (Swaped)
{
throw new Exception("BeforeSwap must be invoked before StartSwap!");
}
fun?.Invoke();
return this;
}
}
}