重写部分http监听

This commit is contained in:
SpringHgui 2021-06-16 00:48:51 +08:00
parent 1a3662b407
commit 885f65e77a
11 changed files with 391 additions and 40 deletions

View File

@ -21,7 +21,7 @@ namespace FastTunnel.Core.Client
public readonly IServerConfig ServerSettings;
readonly ILogger _logger;
ClientListenerV2 clientListener;
HttpListener http_listener;
HttpListenerV2 http_listener;
public FastTunnelServer(ILogger<FastTunnelServer> logger, IConfiguration configuration)
{
@ -29,7 +29,7 @@ namespace FastTunnel.Core.Client
ServerSettings = configuration.Get<AppSettings>().ServerSettings;
clientListener = new ClientListenerV2(this, ServerSettings.BindAddr, ServerSettings.BindPort, _logger);
http_listener = new HttpListener(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
http_listener = new HttpListenerV2(ServerSettings.BindAddr, ServerSettings.WebProxyPort, _logger);
clientListener.OnClientsChange += Client_listener_OnClientsChange;
}
@ -59,7 +59,7 @@ namespace FastTunnel.Core.Client
private void listenHttp()
{
http_listener.Start(new HttpDispatcher(this, _logger, ServerSettings));
http_listener.Start(new HttpDispatcherV2(this, _logger, ServerSettings));
}
private void Client_listener_OnClientsChange(System.Net.Sockets.Socket socket, int count, bool is_oofline)

View File

@ -4,6 +4,7 @@ using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Models;
using FastTunnel.Core.Server;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
@ -136,5 +137,10 @@ namespace FastTunnel.Core.Dispatchers
offLineAction = onOffLine;
Dispatch(httpClient);
}
public void Dispatch(AsyncUserToken token, string words)
{
throw new NotImplementedException();
}
}
}

View File

@ -11,6 +11,7 @@ using System.Text;
using System.Text.RegularExpressions;
using System.Net.Http;
using System.IO;
using FastTunnel.Core.Server;
namespace FastTunnel.Core.Dispatchers
{
@ -29,7 +30,6 @@ namespace FastTunnel.Core.Dispatchers
static string pattern = @"[hH]ost:.+[\r\n]";
public void Dispatch(Socket httpClient)
{
Stream tempBuffer = new MemoryStream();
@ -223,5 +223,10 @@ namespace FastTunnel.Core.Dispatchers
{
Dispatch(httpClient);
}
public void Dispatch(AsyncUserToken token, string words)
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,167 @@
using FastTunnel.Core.Config;
using FastTunnel.Core.Client;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Text.RegularExpressions;
using System.Net.Http;
using System.IO;
using FastTunnel.Core.Server;
namespace FastTunnel.Core.Dispatchers
{
public class HttpDispatcherV2 : IListenerDispatcher
{
readonly ILogger _logger;
readonly IServerConfig _serverSettings;
readonly FastTunnelServer _fastTunnelServer;
public HttpDispatcherV2(FastTunnelServer fastTunnelServer, ILogger logger, IServerConfig serverSettings)
{
_logger = logger;
_serverSettings = serverSettings;
_fastTunnelServer = fastTunnelServer;
}
static string pattern = @"[hH]ost:.+[\r\n]";
public void Dispatch(AsyncUserToken token, string words)
{
// 1.检查白名单
try
{
var endpoint = token.Socket.RemoteEndPoint as System.Net.IPEndPoint;
_logger.LogInformation($"Receive HTTP Request {endpoint.Address}:{endpoint.Port}");
if (_serverSettings.WebAllowAccessIps != null)
{
if (!_serverSettings.WebAllowAccessIps.Contains(endpoint.Address.ToString()))
{
HandlerHostNotAccess(token.Socket);
return;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex);
}
string Host;
MatchCollection collection = Regex.Matches(words, pattern);
if (collection.Count == 0)
{
_logger.LogError($"Host异常{words}");
// 返回错误页
HandlerHostRequired(token.Socket);
return;
}
else
{
Host = collection[0].Value;
}
_logger.LogDebug(Host.Replace("\r", ""));
var domain = Host.Split(":")[1].Trim();
// 判断是否为ip
if (IsIpDomian(domain))
{
// 返回错误页
HandlerHostRequired(token.Socket);
return;
}
WebInfo web;
if (!_fastTunnelServer.WebList.TryGetValue(domain, out web))
{
HandlerClientNotOnLine(token.Socket, domain);
return;
}
var msgid = Guid.NewGuid().ToString();
_fastTunnelServer.newRequest.TryAdd(msgid, new NewRequest
{
CustomerClient = token.Socket,
Buffer = token.Recived
});
try
{
_logger.LogDebug($"OK");
web.Socket.Send(new Message<NewCustomerMassage> { MessageType = MessageType.S_NewCustomer, Content = new NewCustomerMassage { MsgId = msgid, WebConfig = web.WebConfig } });
}
catch (Exception)
{
HandlerClientNotOnLine(token.Socket, domain);
throw;
}
}
public void Dispatch(Socket httpClient)
{
throw new NotImplementedException();
}
private bool IsIpDomian(string domain)
{
return Regex.IsMatch(domain, @"^\d.\d.\d.\d.\d$");
}
private void HandlerHostNotAccess(Socket client)
{
_logger.LogDebug($"### NotAccessIps:'{client.RemoteEndPoint}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_NotAccessIps);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
private void HandlerHostRequired(Socket client)
{
_logger.LogDebug($"### HostRequired:'{client.RemoteEndPoint}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_HostRequired);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
private void HandlerClientNotOnLine(Socket client, string domain)
{
_logger.LogDebug($"### TunnelNotFound:'{domain}'");
string statusLine = "HTTP/1.1 200 OK\r\n";
string responseHeader = "Content-Type: text/html\r\n";
byte[] responseBody = Encoding.UTF8.GetBytes(TunnelResource.Page_NoTunnel);
client.Send(Encoding.UTF8.GetBytes(statusLine));
client.Send(Encoding.UTF8.GetBytes(responseHeader));
client.Send(Encoding.UTF8.GetBytes("\r\n"));
client.Send(responseBody);
client.Close();
}
public void Dispatch(Socket httpClient, Action<Socket> onOffLine)
{
Dispatch(httpClient);
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using FastTunnel.Core.Server;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
@ -9,6 +10,8 @@ namespace FastTunnel.Core.Dispatchers
{
void Dispatch(Socket httpClient, Action<Socket> onOffLine);
void Dispatch(AsyncUserToken token, string words);
void Dispatch(Socket httpClient);
}
}

View File

@ -2,6 +2,7 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Extensions;
using FastTunnel.Core.Models;
using FastTunnel.Core.Server;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
@ -36,5 +37,10 @@ namespace FastTunnel.Core.Handlers.Server
{
Dispatch(httpClient);
}
public void Dispatch(AsyncUserToken token, string words)
{
throw new NotImplementedException();
}
}
}

View File

@ -11,6 +11,7 @@ using System.Threading;
namespace FastTunnel.Core.Listener
{
[Obsolete("由ClientListenerV2替代", true)]
public class ClientListener : IListener
{
ILogger _logger;

View File

@ -4,6 +4,7 @@ using FastTunnel.Core.Extensions;
using FastTunnel.Core.Handlers;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Models;
using FastTunnel.Core.Server;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
@ -25,8 +26,6 @@ namespace FastTunnel.Core.Listener
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
Socket listenSocket;
public IList<ClientConnection> ConnectedSockets = new List<ClientConnection>();
FastTunnelServer _fastTunnelServer;
Server.Server server;
@ -46,7 +45,7 @@ namespace FastTunnel.Core.Listener
_heartHandler = new HeartMessageHandler();
_swapMsgHandler = new SwapMessageHandler(_logger);
server = new Server.Server(1000, 1024);
server = new Server.Server(1000, 10);
}
public void Start(int backlog = 100)
@ -55,11 +54,11 @@ namespace FastTunnel.Core.Listener
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
server.Init();
server.Start(localEndPoint, handle);
server.Start(localEndPoint, "\n", handle);
_logger.LogInformation($"监听客户端 -> {ListenIp}:{ListenPort}");
}
private bool handle(Socket client, string words)
private bool handle(AsyncUserToken token, string words)
{
Message<JObject> msg = JsonConvert.DeserializeObject<Message<JObject>>(words);
@ -79,7 +78,7 @@ namespace FastTunnel.Core.Listener
throw new Exception($"未知的通讯指令 {msg.MessageType}");
}
handler.HandlerMsg(this._fastTunnelServer, client, msg);
handler.HandlerMsg(this._fastTunnelServer, token.Socket, msg);
return handler.NeedRecive;
}
@ -87,17 +86,6 @@ namespace FastTunnel.Core.Listener
{
}
private void HandleNewClient(Socket socket)
{
// 此时的客户端可能有两种 1.登录的客户端 2.交换请求的客户端
var client = new ClientConnection(_fastTunnelServer, socket, _logger);
ConnectedSockets.Add(client);
// 接收客户端消息
client.StartRecive();
}
public void Close()
{
}

View File

@ -0,0 +1,149 @@
using FastTunnel.Core.Dispatchers;
using FastTunnel.Core.Handlers.Server;
using FastTunnel.Core.Server;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace FastTunnel.Core.Listener
{
public class HttpListenerV2
{
ILogger _logger;
public string ListenIp { get; set; }
public int ListenPort { get; set; }
int m_numConnectedSockets;
public event OnClientChangeLine OnClientsChange;
bool shutdown = false;
IListenerDispatcher _requestDispatcher;
Socket listenSocket;
public IList<Socket> ConnectedSockets = new List<Socket>();
Server.Server server;
public HttpListenerV2(string ip, int port, ILogger logger)
{
_logger = logger;
this.ListenIp = ip;
this.ListenPort = port;
server = new Server.Server(1000, 10);
}
private void OnOffLine(Socket socket)
{
if (ConnectedSockets.Remove(socket))
OnClientsChange?.Invoke(socket, ConnectedSockets.Count, true);
}
private void OnAccept(Socket socket)
{
ConnectedSockets.Add(socket);
OnClientsChange?.Invoke(socket, ConnectedSockets.Count, false);
}
public void Stop()
{
if (shutdown)
return;
try
{
if (listenSocket.Connected)
{
listenSocket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception)
{
}
finally
{
shutdown = true;
listenSocket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
}
}
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
_logger.LogDebug($"【{ListenIp}:{ListenPort}】: StartAccept");
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
var accept = e.AcceptSocket;
OnAccept(accept);
Interlocked.Increment(ref m_numConnectedSockets);
_logger.LogInformation($"【{ListenIp}:{ListenPort}】Accepted. There are {{0}} clients connected to the port",
m_numConnectedSockets);
// Accept the next connection request
StartAccept(e);
// 将此客户端交由Dispatcher进行管理
_requestDispatcher.Dispatch(accept, this.OnOffLine);
}
else
{
Stop();
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
public void Close()
{
}
public void Start(IListenerDispatcher requestDispatcher, int backlog = 100)
{
_requestDispatcher = requestDispatcher;
IPAddress ipa = IPAddress.Parse(ListenIp);
IPEndPoint localEndPoint = new IPEndPoint(ipa, ListenPort);
server.Init();
server.Start(localEndPoint, "\r\n\r\n", handle);
_logger.LogInformation($"监听客户端 -> {ListenIp}:{ListenPort}");
}
private bool handle(AsyncUserToken token, string words)
{
_requestDispatcher.Dispatch(token, words);
return false;
}
}
}

View File

@ -12,5 +12,7 @@ namespace FastTunnel.Core.Server
public Socket Socket { get; set; }
public string MassgeTemp { get; set; }
public byte[] Recived { get; set; }
}
}

View File

@ -4,6 +4,7 @@
// is continued until the client disconnects.
using FastTunnel.Core.Extensions;
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
@ -25,7 +26,8 @@ namespace FastTunnel.Core.Server
int m_numConnectedSockets; // the total number of clients connected to the server
Semaphore m_maxNumberAcceptedClients;
Func<Socket, string, bool> m_handller;
Func<AsyncUserToken, string, bool> m_handller;
string m_sectionFlag;
// Create an uninitialized server instance.
// To start the server listening for connection requests
@ -82,9 +84,10 @@ namespace FastTunnel.Core.Server
//
// <param name="localEndPoint">The endpoint which the server will listening
// for connection requests on</param>
public void Start(IPEndPoint localEndPoint, Func<Socket, string, bool> handller)
public void Start(IPEndPoint localEndPoint, string sectionFlag, Func<AsyncUserToken, string, bool> handller)
{
m_handller = handller;
m_sectionFlag = sectionFlag;
// create the socket which listens for incoming connections
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
@ -156,11 +159,6 @@ namespace FastTunnel.Core.Server
StartAccept(e);
}
private bool processLine(Socket socket, byte[] line)
{
return m_handller(socket, Encoding.UTF8.GetString(line));
}
// This method is called whenever a receive or send operation is completed on a socket
//
// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
@ -192,20 +190,46 @@ namespace FastTunnel.Core.Server
{
//increment the count of the total bytes receive by the server
//Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
var words = e.Buffer.GetString(e.Offset, e.BytesTransferred);
if (words.IndexOf("\n") != -1)
if (token.Recived != null)
{
var array = words.Split("\n");
byte[] resArr = new byte[token.Recived.Length + e.BytesTransferred];
token.Recived.CopyTo(resArr, 0);
Array.Copy(e.Buffer, e.Offset, resArr, token.Recived.Length, e.BytesTransferred);
token.Recived = resArr;
}
else
{
byte[] resArr = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, resArr, 0, e.BytesTransferred);
token.Recived = resArr;
}
// handle msg
var msg = token.MassgeTemp + array[0];
var needRecive = m_handller(token.Socket, msg);
token.MassgeTemp = array.Length > 1 ? array[1] : null;
bool needRecive = false;
var words = e.Buffer.GetString(e.Offset, e.BytesTransferred);
if (words.Contains(m_sectionFlag))
{
var array = (token.MassgeTemp + words).Split(m_sectionFlag);
token.MassgeTemp = null;
var fullMsg = words.EndsWith(m_sectionFlag);
if (!needRecive)
if (!fullMsg)
{
return;
token.MassgeTemp = array[array.Length - 1];
}
for (int i = 0; i < array.Length - 1; i++)
{
needRecive = m_handller(token, array[i]);
if (needRecive)
{
continue;
}
else
{
// ÊÍ·Å×ÊÔ´
m_readWritePool.Push(e);
return;
}
}
}
else
@ -269,7 +293,7 @@ namespace FastTunnel.Core.Server
m_readWritePool.Push(e);
m_maxNumberAcceptedClients.Release();
Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
// Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
}
}
}