diff --git a/FastTunnel.Core/Client/FastTunnelClient.cs b/FastTunnel.Core/Client/FastTunnelClient.cs index 77dbf72..a717f76 100644 --- a/FastTunnel.Core/Client/FastTunnelClient.cs +++ b/FastTunnel.Core/Client/FastTunnelClient.cs @@ -193,7 +193,14 @@ namespace FastTunnel.Core.Client var th = new Thread(ReceiveServer); th.Start(_client); - //await new PipeHepler(_client, ProceccLine).ProcessLinesAsync(); + // await new PipeHepler(_client, ProceccLine).ProcessLinesAsync(); + } + + private bool ProceccLine(Socket socket, byte[] line) + { + var cmd = Encoding.UTF8.GetString(line); + HandleServerRequest(cmd); + return true; } private void ReceiveServer(object obj) @@ -249,8 +256,9 @@ namespace FastTunnel.Core.Client try { - foreach (var item in msgs) + for (int i = 0; i < msgs.Length - 1; i++) { + var item = msgs[i]; if (string.IsNullOrEmpty(item)) continue; @@ -263,10 +271,18 @@ namespace FastTunnel.Core.Client lastBuffer = item; } } + + if (string.IsNullOrEmpty(msgs[msgs.Length - 1])) + { + continue; + } + + lastBuffer = msgs[msgs.Length - 1]; + _logger.LogDebug($"lastBuffer={lastBuffer}"); } catch (Exception ex) { - _logger.LogError(ex, "HandleMsg Error"); + _logger.LogError(ex, $"HandleMsg Error {msgs.ToJson()}"); continue; } } @@ -276,32 +292,35 @@ namespace FastTunnel.Core.Client private void HandleServerRequest(string words) { - var Msg = JsonConvert.DeserializeObject>(words); - if (Msg.MessageType != MessageType.Heart) + Task.Run(() => { - _logger.LogDebug($"HandleServerRequest {words}"); - } + var Msg = JsonConvert.DeserializeObject>(words); + if (Msg.MessageType != MessageType.Heart) + { + _logger.LogDebug($"HandleServerRequest {words}"); + } - IClientHandler handler; - switch (Msg.MessageType) - { - case MessageType.Heart: - handler = _clientHeartHandler; - break; - case MessageType.S_NewCustomer: - handler = _newCustomerHandler; - break; - case MessageType.S_NewSSH: - handler = _newSSHHandler; - break; - case MessageType.Log: - handler = _logHandler; - break; - default: - throw new Exception($"未处理的消息:{Msg.MessageType} {Msg.Content}"); - } + IClientHandler handler; + switch (Msg.MessageType) + { + case MessageType.Heart: + handler = _clientHeartHandler; + break; + case MessageType.S_NewCustomer: + handler = _newCustomerHandler; + break; + case MessageType.S_NewSSH: + handler = _newSSHHandler; + break; + case MessageType.Log: + handler = _logHandler; + break; + default: + throw new Exception($"未处理的消息:{Msg.MessageType} {Msg.Content}"); + } - handler.HandlerMsg(this, Msg); + handler.HandlerMsg(this, Msg); + }); } } } diff --git a/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs b/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs index e35dd1a..5662479 100644 --- a/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs +++ b/FastTunnel.Core/Handlers/Client/HttpRequestHandler.cs @@ -47,7 +47,7 @@ namespace FastTunnel.Core.Handlers.Client localConnecter.Connect(); _logger.LogDebug($"连接本地成功 {request.MsgId}"); - new SocketSwap(connecter.Socket, localConnecter.Socket, _logger, request.MsgId).StartSwapAsync(); + new SocketSwap(connecter.Socket, localConnecter.Socket, _logger, request.MsgId).StartSwap(); } catch (SocketException sex) { diff --git a/FastTunnel.Core/Handlers/Client/NewSSHHandler.cs b/FastTunnel.Core/Handlers/Client/NewSSHHandler.cs index 8196dc9..bf0b62a 100644 --- a/FastTunnel.Core/Handlers/Client/NewSSHHandler.cs +++ b/FastTunnel.Core/Handlers/Client/NewSSHHandler.cs @@ -28,7 +28,7 @@ namespace FastTunnel.Core.Handlers.Client var localConnecter_ssh = new DnsSocket(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort); localConnecter_ssh.Connect(); - new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket, _logger, request_ssh.MsgId).StartSwapAsync(); + new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket, _logger, request_ssh.MsgId).StartSwap(); } } } diff --git a/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs b/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs index 2b86b80..7755fcf 100644 --- a/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs +++ b/FastTunnel.Core/Handlers/Server/SwapMessageHandler.cs @@ -39,7 +39,7 @@ namespace FastTunnel.Core.Handlers.Server _logger.LogDebug($"SwapMassage:{SwapMsg.msgId}"); - response.SetResult(new NetworkStream(client)); + response.SetResult(new NetworkStream(client, true)); } else { diff --git a/FastTunnel.Core/Server/PipeHepler.cs b/FastTunnel.Core/Server/PipeHepler.cs new file mode 100644 index 0000000..7114bcd --- /dev/null +++ b/FastTunnel.Core/Server/PipeHepler.cs @@ -0,0 +1,115 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace FastTunnel.Core.Server +{ + public class PipeHepler + { + Socket m_socket; + Func processLine; + const int minimumBufferSize = 512; + + public PipeHepler(Socket socket, Func processLine) + { + this.processLine = processLine; + m_socket = socket; + } + + public async Task ProcessLinesAsync() + { + var pipe = new Pipe(); + Task writing = FillPipeAsync(pipe.Writer); + Task reading = ReadPipeAsync(pipe.Reader); + await Task.WhenAll(reading, writing); + } + + private async Task FillPipeAsync(PipeWriter writer) + { + while (true) + { + // Allocate at least 512 bytes from the PipeWriter. + Memory memory = writer.GetMemory(minimumBufferSize); + try + { + int bytesRead = await m_socket.ReceiveAsync(memory, SocketFlags.None); + if (bytesRead == 0) + { + break; + } + // Tell the PipeWriter how much was read from the Socket. + writer.Advance(bytesRead); + } + catch (Exception) + { + break; + } + + // Make the data available to the PipeReader. + FlushResult result = await writer.FlushAsync(); + + if (result.IsCompleted) + { + break; + } + } + + // By completing PipeWriter, tell the PipeReader that there's no more data coming. + await writer.CompleteAsync(); + } + + private async Task ReadPipeAsync(PipeReader reader) + { + while (true) + { + ReadResult result = await reader.ReadAsync(); + ReadOnlySequence buffer = result.Buffer; + + while (TryReadLine(ref buffer, out ReadOnlySequence line)) + { + // Process the line. + if (!processLine(m_socket, line.ToArray())) + { + // 停止继续监听 + break; + } + } + + // Tell the PipeReader how much of the buffer has been consumed. + reader.AdvanceTo(buffer.Start, buffer.End); + + // Stop reading if there's no more data coming. + if (result.IsCompleted) + { + break; + } + } + + // Mark the PipeReader as complete. + await reader.CompleteAsync(); + } + + static bool TryReadLine(ref ReadOnlySequence buffer, out ReadOnlySequence line) + { + // Look for a EOL in the buffer. + SequencePosition? position = buffer.PositionOf((byte)'\n'); + + if (position == null) + { + line = default; + return false; + } + + // Skip the line + the \n. + line = buffer.Slice(0, position.Value); + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); + return true; + } + + } +} diff --git a/FastTunnel.Core/Sockets/ISocketSwap.cs b/FastTunnel.Core/Sockets/ISocketSwap.cs index 49729f9..6247f33 100644 --- a/FastTunnel.Core/Sockets/ISocketSwap.cs +++ b/FastTunnel.Core/Sockets/ISocketSwap.cs @@ -15,6 +15,6 @@ namespace FastTunnel.Core.Sockets /// ISocketSwap BeforeSwap(Action fun); - Task StartSwapAsync(); + void StartSwap(); } } diff --git a/FastTunnel.Core/Sockets/SocketSwap.cs b/FastTunnel.Core/Sockets/SocketSwap.cs index 9af50c0..5bb0f59 100644 --- a/FastTunnel.Core/Sockets/SocketSwap.cs +++ b/FastTunnel.Core/Sockets/SocketSwap.cs @@ -29,42 +29,113 @@ namespace FastTunnel.Core.Sockets public SocketSwap(Socket sockt1, Socket sockt2, ILogger logger, string msgId) { + //sockt1.NoDelay = true; + //sockt2.NoDelay = true; m_sockt1 = sockt1; m_sockt2 = sockt2; m_msgId = msgId; m_logger = logger; } - public async Task StartSwapAsync() + public void StartSwap() { - m_logger.LogDebug($"StartSwap start {m_msgId}"); - var st1 = new NetworkStream(m_sockt1, ownsSocket: true); - var st2 = new NetworkStream(m_sockt2, ownsSocket: true); + m_logger?.LogDebug($"[StartSwapStart] {m_msgId}"); + swapeStarted = true; - var taskX = st1.CopyToAsync(st2); - var taskY = st2.CopyToAsync(st1); + ThreadPool.QueueUserWorkItem(swapCallback, new Channel + { + Send = m_sockt1, + Receive = m_sockt2 + }); - await Task.WhenAny(taskX, taskY); - m_logger.LogDebug($"StartSwap end {m_msgId}"); + ThreadPool.QueueUserWorkItem(swapCallback, new Channel + { + Send = m_sockt2, + Receive = m_sockt1 + }); + + m_logger?.LogDebug($"[StartSwapEnd] {m_msgId}"); + } + + private void swapCallback(object state) + { + m_logger?.LogDebug($"swapCallback {m_msgId}"); + var chanel = state as Channel; + byte[] result = new byte[512]; + + while (true) + { + int num; + + try + { + try + { + num = chanel.Receive.Receive(result, 0, result.Length, SocketFlags.None); + } + catch (Exception) + { + closeSocket("Revice Fail"); + break; + } + + if (num == 0) + { + closeSocket("Normal Close"); + break; + } + + try + { + chanel.Send.Send(result, 0, num, SocketFlags.None); + } + catch (Exception) + { + closeSocket("Send Fail"); + break; + } + } + catch (Exception ex) + { + m_logger.LogCritical(ex, "致命异常"); + break; + } + } + + if (m_msgId.Contains("_")) + { + var interval = long.Parse(DateTime.Now.GetChinaTicks()) - long.Parse(m_msgId.Split('_')[0]); + m_logger?.LogDebug($"endSwap {m_msgId} 交互时常:{interval}ms"); + } + } + + private void closeSocket(string msg) + { + m_logger.LogDebug($"【closeSocket】:{msg}"); try { - st1.Close(); - // m_sockt1.Close(); - } - catch (Exception ex) - { - - } - - try - { - st2.Close(); - m_sockt2.Close(); + m_sockt1.Shutdown(SocketShutdown.Both); } catch (Exception) { + } + finally + { + m_sockt1.Close(); + } + try + { + m_sockt2.Shutdown(SocketShutdown.Both); + + } + catch (Exception) + { + } + finally + { + m_sockt2.Close(); } } @@ -81,5 +152,6 @@ namespace FastTunnel.Core.Sockets fun?.Invoke(); return this; } + } } \ No newline at end of file diff --git a/publish-win.sh b/publish-win.sh new file mode 100644 index 0000000..a6450de --- /dev/null +++ b/publish-win.sh @@ -0,0 +1,20 @@ +#!/bin/bash +rm -rf publish/* +projects=("FastTunnel.Client" "FastTunnel.Server") +plates=("win-x64") +for project in ${projects[*]}; do + echo + echo "=========开始发布:${project} =========" + echo + for plate in ${plates[*]}; do + echo "plate=${plate}" + echo src/$project/$project.csproj + dotnet publish $project/$project.csproj -o=publish/$project.$plate -c=release #-p:PublishTrimmed=true --nologo + echo + echo "=========开始打包 =========" + echo + cd publish && tar -zcvf $project.$plate.tar.gz $project.$plate + cd ../ + # rm -rf publish/$project.$plate + done +done