基本功能可实现

This commit is contained in:
SpringHgui 2021-07-30 23:57:39 +08:00
parent 2228750243
commit d98b7fea76
8 changed files with 276 additions and 50 deletions

View File

@ -193,7 +193,14 @@ namespace FastTunnel.Core.Client
var th = new Thread(ReceiveServer);
th.Start(_client);
//await new PipeHepler(_client, ProceccLine).ProcessLinesAsync();
// await new PipeHepler(_client, ProceccLine).ProcessLinesAsync();
}
private bool ProceccLine(Socket socket, byte[] line)
{
var cmd = Encoding.UTF8.GetString(line);
HandleServerRequest(cmd);
return true;
}
private void ReceiveServer(object obj)
@ -249,8 +256,9 @@ namespace FastTunnel.Core.Client
try
{
foreach (var item in msgs)
for (int i = 0; i < msgs.Length - 1; i++)
{
var item = msgs[i];
if (string.IsNullOrEmpty(item))
continue;
@ -263,10 +271,18 @@ namespace FastTunnel.Core.Client
lastBuffer = item;
}
}
if (string.IsNullOrEmpty(msgs[msgs.Length - 1]))
{
continue;
}
lastBuffer = msgs[msgs.Length - 1];
_logger.LogDebug($"lastBuffer={lastBuffer}");
}
catch (Exception ex)
{
_logger.LogError(ex, "HandleMsg Error");
_logger.LogError(ex, $"HandleMsg Error {msgs.ToJson()}");
continue;
}
}
@ -276,32 +292,35 @@ namespace FastTunnel.Core.Client
private void HandleServerRequest(string words)
{
var Msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
if (Msg.MessageType != MessageType.Heart)
Task.Run(() =>
{
_logger.LogDebug($"HandleServerRequest {words}");
}
var Msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
if (Msg.MessageType != MessageType.Heart)
{
_logger.LogDebug($"HandleServerRequest {words}");
}
IClientHandler handler;
switch (Msg.MessageType)
{
case MessageType.Heart:
handler = _clientHeartHandler;
break;
case MessageType.S_NewCustomer:
handler = _newCustomerHandler;
break;
case MessageType.S_NewSSH:
handler = _newSSHHandler;
break;
case MessageType.Log:
handler = _logHandler;
break;
default:
throw new Exception($"未处理的消息:{Msg.MessageType} {Msg.Content}");
}
IClientHandler handler;
switch (Msg.MessageType)
{
case MessageType.Heart:
handler = _clientHeartHandler;
break;
case MessageType.S_NewCustomer:
handler = _newCustomerHandler;
break;
case MessageType.S_NewSSH:
handler = _newSSHHandler;
break;
case MessageType.Log:
handler = _logHandler;
break;
default:
throw new Exception($"未处理的消息:{Msg.MessageType} {Msg.Content}");
}
handler.HandlerMsg(this, Msg);
handler.HandlerMsg(this, Msg);
});
}
}
}

View File

@ -47,7 +47,7 @@ namespace FastTunnel.Core.Handlers.Client
localConnecter.Connect();
_logger.LogDebug($"连接本地成功 {request.MsgId}");
new SocketSwap(connecter.Socket, localConnecter.Socket, _logger, request.MsgId).StartSwapAsync();
new SocketSwap(connecter.Socket, localConnecter.Socket, _logger, request.MsgId).StartSwap();
}
catch (SocketException sex)
{

View File

@ -28,7 +28,7 @@ namespace FastTunnel.Core.Handlers.Client
var localConnecter_ssh = new DnsSocket(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort);
localConnecter_ssh.Connect();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket, _logger, request_ssh.MsgId).StartSwapAsync();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket, _logger, request_ssh.MsgId).StartSwap();
}
}
}

View File

@ -39,7 +39,7 @@ namespace FastTunnel.Core.Handlers.Server
_logger.LogDebug($"SwapMassage{SwapMsg.msgId}");
response.SetResult(new NetworkStream(client));
response.SetResult(new NetworkStream(client, true));
}
else
{

View File

@ -0,0 +1,115 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace FastTunnel.Core.Server
{
public class PipeHepler
{
Socket m_socket;
Func<Socket, byte[], bool> processLine;
const int minimumBufferSize = 512;
public PipeHepler(Socket socket, Func<Socket, byte[], bool> processLine)
{
this.processLine = processLine;
m_socket = socket;
}
public async Task ProcessLinesAsync()
{
var pipe = new Pipe();
Task writing = FillPipeAsync(pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
private async Task FillPipeAsync(PipeWriter writer)
{
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await m_socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception)
{
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
if (!processLine(m_socket, line.ToArray()))
{
// 停止继续监听
break;
}
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}
}

View File

@ -15,6 +15,6 @@ namespace FastTunnel.Core.Sockets
/// <returns></returns>
ISocketSwap BeforeSwap(Action fun);
Task StartSwapAsync();
void StartSwap();
}
}

View File

@ -29,42 +29,113 @@ namespace FastTunnel.Core.Sockets
public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId)
{
//sockt1.NoDelay = true;
//sockt2.NoDelay = true;
m_sockt1 = sockt1;
m_sockt2 = sockt2;
m_msgId = msgId;
m_logger = logger;
}
public async Task StartSwapAsync()
public void StartSwap()
{
m_logger.LogDebug($"StartSwap start {m_msgId}");
var st1 = new NetworkStream(m_sockt1, ownsSocket: true);
var st2 = new NetworkStream(m_sockt2, ownsSocket: true);
m_logger?.LogDebug($"[StartSwapStart] {m_msgId}");
swapeStarted = true;
var taskX = st1.CopyToAsync(st2);
var taskY = st2.CopyToAsync(st1);
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = m_sockt1,
Receive = m_sockt2
});
await Task.WhenAny(taskX, taskY);
m_logger.LogDebug($"StartSwap end {m_msgId}");
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = m_sockt2,
Receive = m_sockt1
});
m_logger?.LogDebug($"[StartSwapEnd] {m_msgId}");
}
private void swapCallback(object state)
{
m_logger?.LogDebug($"swapCallback {m_msgId}");
var chanel = state as Channel;
byte[] result = new byte[512];
while (true)
{
int num;
try
{
try
{
num = chanel.Receive.Receive(result, 0, result.Length, SocketFlags.None);
}
catch (Exception)
{
closeSocket("Revice Fail");
break;
}
if (num == 0)
{
closeSocket("Normal Close");
break;
}
try
{
chanel.Send.Send(result, 0, num, SocketFlags.None);
}
catch (Exception)
{
closeSocket("Send Fail");
break;
}
}
catch (Exception ex)
{
m_logger.LogCritical(ex, "致命异常");
break;
}
}
if (m_msgId.Contains("_"))
{
var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(m_msgId.Split('_')[0]);
m_logger?.LogDebug($"endSwap {m_msgId} 交互时常:{interval}ms");
}
}
private void closeSocket(string msg)
{
m_logger.LogDebug($"【closeSocket】{msg}");
try
{
st1.Close();
// m_sockt1.Close();
}
catch (Exception ex)
{
}
try
{
st2.Close();
m_sockt2.Close();
m_sockt1.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt1.Close();
}
try
{
m_sockt2.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
finally
{
m_sockt2.Close();
}
}
@ -81,5 +152,6 @@ namespace FastTunnel.Core.Sockets
fun?.Invoke();
return this;
}
}
}

20
publish-win.sh Normal file
View File

@ -0,0 +1,20 @@
#!/bin/bash
rm -rf publish/*
projects=("FastTunnel.Client" "FastTunnel.Server")
plates=("win-x64")
for project in ${projects[*]}; do
echo
echo "=========开始发布:${project} ========="
echo
for plate in ${plates[*]}; do
echo "plate=${plate}"
echo src/$project/$project.csproj
dotnet publish $project/$project.csproj -o=publish/$project.$plate -c=release #-p:PublishTrimmed=true --nologo
echo
echo "=========开始打包 ========="
echo
cd publish && tar -zcvf $project.$plate.tar.gz $project.$plate
cd ../
# rm -rf publish/$project.$plate
done
done