重写完成

This commit is contained in:
Gui.H 2021-06-17 20:10:45 +08:00
parent d898f1d5ea
commit bb337d33c8
19 changed files with 200 additions and 637 deletions

View File

@ -21,7 +21,7 @@
"LocalIp": "127.0.0.1",
//
"LocalPort": 8080,
"LocalPort": 9529,
// , 访url http://{SubDomain}.{Domain}:{ProxyPort_HTTP}/
"SubDomain": "test", // test.test.cc

View File

@ -1,85 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FastTunnel.Core
{
public class AsyncSocketSwap
{
private Socket m_sockt1;
private Socket m_sockt2;
bool m_swaping = false;
public AsyncSocketSwap(Socket sockt1, Socket sockt2)
{
m_sockt1 = sockt1;
m_sockt2 = sockt2;
}
public AsyncSocketSwap BeforeSwap(Action fun)
{
if (m_swaping)
throw new Exception("BeforeSwap must be invoked before StartSwap!");
fun?.Invoke();
return this;
}
private void StartSwap()
{
m_swaping = true;
var rcv1 = new DataReciver(m_sockt1);
rcv1.OnComplete += Rcv1_OnComplete;
rcv1.ReciveOneAsync();
var rcv2 = new DataReciver(m_sockt2);
rcv2.OnComplete += Rcv2_OnComplete;
rcv2.ReciveOneAsync();
}
public void StartSwapAsync()
{
Task.Run(() =>
{
try
{
StartSwap();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
ExceptionDispatchInfo.Capture(ex).Throw();
}
});
}
private void Rcv1_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
try
{
m_sockt2.Send(buffer, index, count, SocketFlags.None);
send.ReciveOneAsync();
}
catch (Exception)
{
}
}
private void Rcv2_OnComplete(DataReciver send, byte[] buffer, int index, int count)
{
try
{
m_sockt1.Send(buffer, index, count, SocketFlags.None);
send.ReciveOneAsync();
}
catch (Exception ex)
{
}
}
}
}

View File

@ -89,16 +89,10 @@ namespace FastTunnel.Core
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
Console.WriteLine("ProcessReceive:" + e.BytesTransferred);
e.SetBuffer(e.Offset, e.BytesTransferred);
try
{
if (!token.Sender.SendAsync(e))
{
ProcessSend(e);
}
}
catch (Exception)
e.SetBuffer(e.Offset, 512);
if (!token.Sender.SendAsync(e))
{
ProcessSend(e);
}
}
else
@ -117,6 +111,7 @@ namespace FastTunnel.Core
if (!token.Reciver.ReceiveAsync(e))
{
e.SetBuffer(e.Offset, 512);
ProcessReceive(e);
}
}

View File

@ -14,6 +14,7 @@ using System.Threading;
using Microsoft.Extensions.Logging;
using FastTunnel.Core.Handlers.Client;
using Microsoft.Extensions.Configuration;
using FastTunnel.Core.Server;
namespace FastTunnel.Core.Client
{
@ -232,8 +233,23 @@ namespace FastTunnel.Core.Client
timer_heart.Start();
timer_timeout.Start();
th = new Thread(ReceiveServer);
th.Start(socket);
//th = new Thread(ReceiveServer);
//th.Start(socket);
ReceiveServerV2(socket);
}
private void ReceiveServerV2(object obj)
{
var client = obj as Socket;
new PipeHepler(client, ProceccLine).ProcessLinesAsync();
}
private bool ProceccLine(Socket socket, byte[] line)
{
var cmd = Encoding.UTF8.GetString(line);
HandleServerRequest(cmd);
return true;
}
private void ReceiveServer(object obj)

View File

@ -1,232 +0,0 @@
using FastTunnel.Core.Config;
using FastTunnel.Core.Client;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Text.RegularExpressions;
using System.Net.Http;
using System.IO;
using FastTunnel.Core.Server;
namespace FastTunnel.Core.Dispatchers
{
public class HttpDispatcher : IListenerDispatcher
{
readonly ILogger _logger;
readonly IServerConfig _serverSettings;
readonly FastTunnelServer _fastTunnelServer;
public HttpDispatcher(FastTunnelServer fastTunnelServer, ILogger logger, IServerConfig serverSettings)
{
_logger = logger;
_serverSettings = serverSettings;
_fastTunnelServer = fastTunnelServer;
}
static string pattern = @"[hH]ost:.+[\r\n]";
public void Dispatch(Socket httpClient)
{
Stream tempBuffer = new MemoryStream();
try
{
// 1.检查白名单
try
{
var endpoint = httpClient.RemoteEndPoint as System.Net.IPEndPoint;
_logger.LogInformation($"Receive HTTP Request {endpoint.Address}:{endpoint.Port}");
if (_serverSettings.WebAllowAccessIps != null)
{
if (!_serverSettings.WebAllowAccessIps.Contains(endpoint.Address.ToString()))
{
HandlerHostNotAccess(httpClient);
return;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex);
}
//定义byte数组存放从客户端接收过来的数据
byte[] buffer = new byte[1024]; // 1k
MatchCollection collection;
string words = string.Empty;
try
{
while (true)
{
var count = httpClient.Receive(buffer);
if (count == 0)
{
httpClient.Close();
return;
}
// 读取的字节缓存到内存
tempBuffer.Write(buffer, 0, count);
tempBuffer.Seek(0, SeekOrigin.Begin);
var array = new byte[tempBuffer.Length];
tempBuffer.Read(array, 0, (int)tempBuffer.Length);
// 将字节转换成字符串
words = Encoding.UTF8.GetString(array, 0, (int)tempBuffer.Length);
collection = Regex.Matches(words, pattern);
if (collection.Count > 0 || count < buffer.Length)
{
break;
}
}
}
catch (SocketException ex)
{
_logger.LogError(ex.Message);
if (httpClient.Connected)
httpClient.Close();
return;
}
catch (Exception ex)
{
_logger.LogError(ex);
throw;
}
string Host;
if (collection.Count == 0)
{
_logger.LogError($"Host异常{words}");
// 返回错误页
HandlerHostRequired(httpClient);
return;
}
else
{
Host = collection[0].Value;
}
_logger.LogDebug(Host.Replace("\r", ""));
var domain = Host.Split(":")[1].Trim();
// 判断是否为ip
if (IsIpDomian(domain))
{
// 返回错误页
HandlerHostRequired(httpClient);
return;
}
WebInfo web;
if (!_fastTunnelServer.WebList.TryGetValue(domain, out web))
{
HandlerClientNotOnLine(httpClient, domain);
return;
}
var msgid = Guid.NewGuid().ToString();
tempBuffer.Seek(0, SeekOrigin.Begin);
var byteArray = new byte[tempBuffer.Length];
tempBuffer.Read(byteArray, 0, (int)tempBuffer.Length);
tempBuffer.Close();
_fastTunnelServer.RequestTemp.TryAdd(msgid, new NewRequest
{
CustomerClient = httpClient,
Buffer = byteArray
});
try
{
_logger.LogDebug($"OK");
web.Socket.Send(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = msgid, WebConfig = web.WebConfig } });
}
catch (Exception)
{
HandlerClientNotOnLine(httpClient, domain);
throw;
}
}
catch (Exception ex)
{
_logger.LogError("处理Http失败" + ex);
httpClient.Close();
}
}
private bool IsIpDomian(string domain)
{
return Regex.IsMatch(domain, @"^\d.\d.\d.\d.\d$");
}
private void HandlerHostNotAccess(Socket client)
{
_logger.LogDebug($"### NotAccessIps:'{client.RemoteEndPoint}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_NotAccessIps);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
private void HandlerHostRequired(Socket client)
{
_logger.LogDebug($"### HostRequired:'{client.RemoteEndPoint}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_HostRequired);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
private void HandlerClientNotOnLine(Socket client, string domain)
{
_logger.LogDebug($"### TunnelNotFound:'{domain}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_NoTunnel);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
public void Dispatch(Socket httpClient, Action<Socket> onOffLine)
{
Dispatch(httpClient);
}
public void Dispatch(AsyncUserToken token, string words)
{
throw new NotImplementedException();
}
}
}

View File

@ -32,6 +32,8 @@ namespace FastTunnel.Core.Dispatchers
public void Dispatch(AsyncUserToken token, string words)
{
Console.WriteLine("=======Dispatch HTTP========");
// 1.检查白名单
try
{
@ -52,6 +54,8 @@ namespace FastTunnel.Core.Dispatchers
_logger.LogError(ex);
}
Console.WriteLine("=======Dispatch Matches========");
string Host;
MatchCollection collection = Regex.Matches(words, pattern);
if (collection.Count == 0)
@ -70,6 +74,8 @@ namespace FastTunnel.Core.Dispatchers
_logger.LogDebug(Host.Replace("\r", ""));
var domain = Host.Split(":")[1].Trim();
Console.WriteLine($"=======Dispatch domain:{domain}========");
// 判断是否为ip
if (IsIpDomian(domain))
{
@ -81,10 +87,12 @@ namespace FastTunnel.Core.Dispatchers
WebInfo web;
if (!_fastTunnelServer.WebList.TryGetValue(domain, out web))
{
Console.WriteLine($"=======Dispatch 未登录========");
HandlerClientNotOnLine(token.Socket, domain);
return;
}
Console.WriteLine($"=======Dispatch 已找到========");
var msgid = Guid.NewGuid().ToString();
_fastTunnelServer.RequestTemp.TryAdd(msgid, new NewRequest
{
@ -92,15 +100,23 @@ namespace FastTunnel.Core.Dispatchers
Buffer = token.Recived
});
Console.WriteLine($"=======Dispatch 发送msg========");
try
{
_logger.LogDebug($"OK");
_logger.LogDebug($"=======OK========");
web.Socket.Send(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = msgid, WebConfig = web.WebConfig } });
Console.WriteLine($"=======Dispatch OK========");
}
catch (Exception)
{
HandlerClientNotOnLine(token.Socket, domain);
throw;
Console.WriteLine($"=======Dispatch 移除========");
// 移除
_fastTunnelServer.WebList.TryRemove(domain, out _);
}
}

View File

@ -26,7 +26,10 @@
<Compile Remove="Client\SuiDaoServer.cs.BASE.cs" />
<Compile Remove="Client\SuiDaoServer.cs.LOCAL.cs" />
<Compile Remove="Client\SuiDaoServer.cs.REMOTE.cs" />
<Compile Remove="Dispatchers\HttpDispatcher.cs" />
<Compile Remove="Listener.cs" />
<Compile Remove="Listener\ClientListener.cs" />
<Compile Remove="Listener\HttpListener.cs" />
<Compile Remove="Server.cs" />
</ItemGroup>

View File

@ -59,7 +59,7 @@ namespace FastTunnel.Core.Handlers.Client
throw;
}
new AsyncSocketSwapV2(connecter.Socket, localConnecter.Socket).StartSwapAsync();
new SocketSwap(connecter.Socket, localConnecter.Socket).StartSwapAsync();
}
}
}

View File

@ -20,7 +20,7 @@ namespace FastTunnel.Core.Handlers.Client
var localConnecter_ssh = new Connecter(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort, 5000);
localConnecter_ssh.Connect();
new AsyncSocketSwapV2(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwapAsync();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwapAsync();
}
}
}

View File

@ -67,7 +67,8 @@ namespace FastTunnel.Core.Handlers
}
}
var sb = new StringBuilder($"{Environment.NewLine}=====隧道已建立成功,可通过以下方式访问内网服务====={Environment.NewLine}");
var sb = new StringBuilder($"{Environment.NewLine}=====隧道已建立成功,可通过以下方式访问内网服务====={Environment.NewLine}{Environment.NewLine}");
sb.Append($"穿透协议 | 映射关系(公网=>内网){Environment.NewLine}");
if (requet.Webs != null && requet.Webs.Count() > 0)
{
hasTunnel = true;
@ -78,8 +79,8 @@ namespace FastTunnel.Core.Handlers
_logger.LogDebug($"new domain '{hostName}'");
server.WebList.AddOrUpdate(hostName, info, (key, oldInfo) => { return info; });
sb.Append($"{Environment.NewLine} http://{hostName}{(server.ServerSettings.WebHasNginxProxy ? string.Empty : ":" + server.ServerSettings.WebProxyPort)} => {item.LocalIp}:{item.LocalPort}");
sb.Append($" HTTP | http://{hostName}{(server.ServerSettings.WebHasNginxProxy ? string.Empty : ":" + server.ServerSettings.WebProxyPort)} => {item.LocalIp}:{item.LocalPort}");
sb.Append(Environment.NewLine);
if (item.WWW != null)
{
foreach (var www in item.WWW)
@ -88,7 +89,8 @@ namespace FastTunnel.Core.Handlers
_logger.LogInformation($"WWW {www}");
server.WebList.AddOrUpdate(www, info, (key, oldInfo) => { return info; });
sb.Append($"{Environment.NewLine} http://{www}{(server.ServerSettings.WebHasNginxProxy ? string.Empty : ":" + server.ServerSettings.WebProxyPort)} => {item.LocalIp}:{item.LocalPort}");
sb.Append($" HTTP | http://{www}{(server.ServerSettings.WebHasNginxProxy ? string.Empty : ":" + server.ServerSettings.WebProxyPort)} => {item.LocalIp}:{item.LocalPort}");
sb.Append(Environment.NewLine);
}
}
}
@ -131,7 +133,8 @@ namespace FastTunnel.Core.Handlers
server.SSHList.TryAdd(item.RemotePort, new SSHInfo<SSHHandlerArg> { Listener = ls, Socket = client, SSHConfig = item });
_logger.LogDebug($"SSH proxy success: {item.RemotePort} => {item.LocalIp}:{item.LocalPort}");
sb.Append($"{Environment.NewLine} {server.ServerSettings.WebDomain}:{item.RemotePort} => {item.LocalIp}:{item.LocalPort}");
sb.Append($" TCP | {server.ServerSettings.WebDomain}:{item.RemotePort} => {item.LocalIp}:{item.LocalPort}");
sb.Append(Environment.NewLine);
}
catch (Exception ex)
{
@ -149,7 +152,7 @@ namespace FastTunnel.Core.Handlers
}
else
{
sb.Append($"{Environment.NewLine}{Environment.NewLine}====================================================");
sb.Append($"{Environment.NewLine}====================================================");
client.Send(new Message<LogMassage> { MessageType = MessageType.Log, Content = new LogMassage(LogMsgType.Info, sb.ToString()) });
}
}

View File

@ -32,7 +32,7 @@ namespace FastTunnel.Core.Handlers.Server
server.RequestTemp.TryRemove(SwapMsg.msgId, out _);
// Join
new AsyncSocketSwapV2(request.CustomerClient, client)
new SocketSwap(request.CustomerClient, client)
.BeforeSwap(() =>
{
if (request.Buffer != null) client.Send(request.Buffer);

View File

@ -1,133 +0,0 @@
using FastTunnel.Core.Client;
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers.Server;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace FastTunnel.Core.Listener
{
[Obsolete("由ClientListenerV2替代", true)]
public class ClientListener : IListener
{
ILogger _logger;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
Socket listenSocket;
public IList<ClientConnection> ConnectedSockets = new List<ClientConnection>();
FastTunnelServer _fastTunnelServer;
public ClientListener(FastTunnelServer fastTunnelServer, string ip, int port, ILogger logerr)
{
_fastTunnelServer = fastTunnelServer;
_logger = logerr;
this.ListenIp = ip;
this.ListenPort = port;
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
}
private void HandleNewClient(Socket socket)
{
// 此时的客户端可能有两种 1.登录的客户端 2.交换请求的客户端
var client = new ClientConnection(_fastTunnelServer, socket, _logger);
ConnectedSockets.Add(client);
// 接收客户端消息
client.StartRecive();
}
public void Start(int backlog = 100)
{
shutdown = false;
listenSocket.Listen(backlog);
StartAccept(null);
_logger.LogInformation($"监听客户端 -> {ListenIp}:{ListenPort}");
}
public void Stop()
{
if (shutdown)
return;
try
{
if (listenSocket.Connected)
{
listenSocket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception)
{
}
finally
{
shutdown = true;
listenSocket.Close();
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logger.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
StartAccept(e);
HandleNewClient(accept);
}
else
{
_logger.LogError($"监听客户端异常 this={this.ToJson()} e={e.ToJson()}");
Stop();
}
}
public void Close()
{
}
}
}

View File

@ -45,7 +45,7 @@ namespace FastTunnel.Core.Listener
_heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(1000, 1024);
server = new Server.Server(1000, 100);
}
public void Start(int backlog = 100)
@ -60,6 +60,8 @@ namespace FastTunnel.Core.Listener
private bool handle(AsyncUserToken token, string words)
{
//Console.WriteLine($"[Client请求] {words}");
Message<JObject> msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
IClientMessageHandler handler = null;

View File

@ -1,147 +0,0 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Handlers.Server;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace FastTunnel.Core.Listener
{
public class HttpListener : IListener
{
ILogger _logger;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
int m_numConnectedSockets;
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
IListenerDispatcher _requestDispatcher;
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
public HttpListener(string ip, int port, ILogger logger)
{
_logger = logger;
this.ListenIp = ip;
this.ListenPort = port;
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
}
private void OnOffLine(Socket socket)
{
if (ConnectedSockets.Remove(socket))
OnClientsChange?.Invoke(socket, ConnectedSockets.Count, true);
}
private void OnAccept(Socket socket)
{
ConnectedSockets.Add(socket);
OnClientsChange?.Invoke(socket, ConnectedSockets.Count, false);
}
public void Start(IListenerDispatcher requestDispatcher, int backlog = 100)
{
shutdown = false;
_requestDispatcher = requestDispatcher;
listenSocket.Listen(backlog);
StartAccept(null);
_logger.LogInformation($"监听HTTP请求 -> {ListenIp}:{ListenPort}");
}
public void Stop()
{
if (shutdown)
return;
try
{
if (listenSocket.Connected)
{
listenSocket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception)
{
}
finally
{
shutdown = true;
listenSocket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logger.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
OnAccept(accept);
Interlocked.Increment(ref m_numConnectedSockets);
_logger.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// Accept the next connection request
StartAccept(e);
// 将此客户端交由Dispatcher进行管理
_requestDispatcher.Dispatch(accept, this.OnOffLine);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
public void Close()
{
}
public void Start(int backlog = 100)
{
throw new NotImplementedException();
}
}
}

View File

@ -36,7 +36,7 @@ namespace FastTunnel.Core.Listener
this.ListenIp = ip;
this.ListenPort = port;
server = new Server.Server(1000, 1024);
server = new Server.Server(1000, 512);
}
private void OnOffLine(Socket socket)
@ -142,6 +142,7 @@ namespace FastTunnel.Core.Listener
private bool handle(AsyncUserToken token, string words)
{
Console.WriteLine(words);
_requestDispatcher.Dispatch(token, words);
return false;
}

View File

@ -28,7 +28,7 @@ namespace FastTunnel.Core.Server
await Task.WhenAll(reading, writing);
}
public async Task FillPipeAsync(PipeWriter writer)
private async Task FillPipeAsync(PipeWriter writer)
{
const int minimumBufferSize = 512;
@ -64,7 +64,7 @@ namespace FastTunnel.Core.Server
await writer.CompleteAsync();
}
public async Task ReadPipeAsync(PipeReader reader)
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{

View File

@ -28,6 +28,7 @@ namespace FastTunnel.Core.Server
Func<AsyncUserToken, string, bool> m_handller;
string m_sectionFlag;
bool showLog;
// Create an uninitialized server instance.
// To start the server listening for connection requests
@ -97,8 +98,6 @@ namespace FastTunnel.Core.Server
// post accepts on the listening socket
StartAccept(null);
//Console.WriteLine("{0} connected sockets with one outstanding receive posted to each....press any key", m_outstandingReadCount);
}
// Begins an operation to accept a connection request from the client
@ -137,18 +136,16 @@ namespace FastTunnel.Core.Server
private void ProcessAccept(SocketAsyncEventArgs e)
{
Interlocked.Increment(ref m_numConnectedSockets);
Console.WriteLine("Client connection accepted. There are {0} clients connected to the server",
m_numConnectedSockets);
// new PipeHepler(e.AcceptSocket, processLine).ProcessLinesAsync();
// Get the socket for the accepted client connection and put it into the
//ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).MassgeTemp = null;
((AsyncUserToken)readEventArgs.UserToken).Recived = null;
readEventArgs.SetBuffer(e.Offset, m_receiveBufferSize);
Console.WriteLine("ReceiveAsync");
// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
@ -165,7 +162,6 @@ namespace FastTunnel.Core.Server
// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
Console.WriteLine("IO_Completed");
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
{
@ -186,8 +182,6 @@ namespace FastTunnel.Core.Server
//
private void ProcessReceive(SocketAsyncEventArgs e)
{
Console.WriteLine("ProcessReceive");
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
@ -209,9 +203,10 @@ namespace FastTunnel.Core.Server
bool needRecive = false;
var words = e.Buffer.GetString(e.Offset, e.BytesTransferred);
if (words.Contains(m_sectionFlag))
var sum = token.MassgeTemp + words;
if (sum.Contains(m_sectionFlag))
{
var array = (token.MassgeTemp + words).Split(m_sectionFlag);
var array = (sum).Split(m_sectionFlag);
token.MassgeTemp = null;
var fullMsg = words.EndsWith(m_sectionFlag);
@ -237,9 +232,10 @@ namespace FastTunnel.Core.Server
}
else
{
token.MassgeTemp += words;
token.MassgeTemp = sum;
}
e.SetBuffer(e.Offset, m_receiveBufferSize);
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
@ -264,6 +260,7 @@ namespace FastTunnel.Core.Server
// done echoing data back to the client
AsyncUserToken token = (AsyncUserToken)e.UserToken;
// read the next block of data send from the client
e.SetBuffer(e.Offset, m_receiveBufferSize);
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{

View File

@ -0,0 +1,125 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace FastTunnel.Core
{
public class SocketSwap
{
private Socket _sockt1;
private Socket _sockt2;
bool Swaped = false;
private class Channel
{
public Socket Send { get; set; }
public Socket Receive { get; set; }
}
public SocketSwap(Socket sockt1, Socket sockt2)
{
_sockt1 = sockt1;
_sockt2 = sockt2;
}
public void StartSwapAsync()
{
Swaped = true;
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = _sockt1,
Receive = _sockt2
});
ThreadPool.QueueUserWorkItem(swapCallback, new Channel
{
Send = _sockt2,
Receive = _sockt1
});
}
private void swapCallback(object state)
{
var chanel = state as Channel;
byte[] result = new byte[1024];
while (true)
{
try
{
if (!chanel.Receive.Connected)
break;
int num = chanel.Receive.Receive(result, result.Length, SocketFlags.None);
if (num == 0)
{
chanel.Receive.Close();
try
{
// Release the socket.//
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
break;
}
if (!chanel.Send.Connected)
break;
// var str = Encoding.UTF8.GetString(result, 0, num);
chanel.Send.Send(result, num, SocketFlags.None);
}
catch (SocketException)
{
// Interrupted function call. 10004
// An existing connection was forcibly closed by the remote host. 10054
try
{
chanel.Send.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Send.Close();
}
try
{
chanel.Receive.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
chanel.Receive.Close();
}
break;
}
catch (Exception ex)
{
Console.Write(ex.ToString());
throw;
}
}
}
internal SocketSwap BeforeSwap(Action fun)
{
if (Swaped)
{
throw new Exception("BeforeSwap must be invoked before StartSwap!");
}
fun?.Invoke();
return this;
}
}
}

View File

@ -27,5 +27,7 @@
<ProjectReference Include="..\FastTunnel.Core\FastTunnel.Core.csproj" />
</ItemGroup>
<ProjectExtensions><VisualStudio><UserProperties config_4appsettings_1json__JsonSchema="" /></VisualStudio></ProjectExtensions>
</Project>