增加客户端心跳,自动重连机制

This commit is contained in:
SpringHgui 2019-12-26 15:47:36 +08:00
parent 5bf3b448e2
commit 8408c9d68f
9 changed files with 250 additions and 132 deletions

View File

@ -33,6 +33,11 @@ namespace FastTunnel.Client
{
var FastTunnelClient = new FastTunnelClient(settings.ClientSettings, new ConsoleLogger());
FastTunnelClient.Login();
while (true)
{
Thread.Sleep(10000 * 60);
}
}
}
}

View File

@ -9,9 +9,9 @@
"ClientSettings": {
"Common": {
// ip, BindAddr
"ServerAddr": "154.202.58.219",
//"ServerAddr": "154.202.58.219",
//"ServerAddr": "127.0.0.1",
"ServerAddr": "127.0.0.1",
// BindPort
"ServerPort": 1271

View File

@ -8,9 +8,10 @@ using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FastTunnel.Core.Extensions;
using System.Timers;
using System.Threading;
namespace FastTunnel.Core.Client
{
@ -18,28 +19,125 @@ namespace FastTunnel.Core.Client
{
ClientConfig _clientConfig;
Connecter connecter;
Connecter _client;
ILogger _logger;
System.Timers.Timer timer_timeout;
System.Timers.Timer timer_heart;
double heartInterval = 5000;
DateTime lastHeart;
Thread th;
public FastTunnelClient(ClientConfig clientConfig, ILogger logger)
{
_logger = logger;
_clientConfig = clientConfig;
connecter = new Connecter(_clientConfig.Common.ServerAddr, _clientConfig.Common.ServerPort);
initailTimer();
}
private void initailTimer()
{
timer_heart = new System.Timers.Timer();
timer_heart.AutoReset = true;
timer_heart.Interval = heartInterval; // 5秒心跳
timer_heart.Elapsed += HeartElapsed;
timer_timeout = new System.Timers.Timer();
timer_timeout.AutoReset = true;
timer_timeout.Interval = heartInterval + heartInterval / 2;
timer_timeout.Elapsed += TimeoutElapsed;
}
private void TimeoutElapsed(object sender, ElapsedEventArgs e)
{
if (lastHeart == null)
return;
var timer = sender as System.Timers.Timer;
var span = (DateTime.Now - lastHeart).TotalMilliseconds;
if (span > timer.Interval)
{
_logger.Debug($"上次心跳时间为{span}ms前");
// 重新登录
reConnect();
}
}
private void reConnect()
{
Close();
Login();
}
private void HeartElapsed(object sender, ElapsedEventArgs e)
{
try
{
_client.Send(new Message<string> { MessageType = MessageType.Heart, Content = null });
}
catch (Exception ex)
{
_logger.Error(ex.Message);
}
}
void Close()
{
timer_heart.Stop();
timer_timeout.Stop();
try
{
if (_client.Socket.Connected)
{
_client.Socket.Shutdown(SocketShutdown.Both);
}
}
catch (Exception ex)
{
_logger.Error(ex);
}
finally
{
_client.Socket.Close();
_logger.Debug("已退出登录\n");
}
}
public void Login()
{
_logger.Debug("登录中...");
//连接到的目标IP
connecter.Connect();
try
{
_client = new Connecter(_clientConfig.Common.ServerAddr, _clientConfig.Common.ServerPort);
_client.Connect();
}
catch (Exception ex)
{
_logger.Error(ex.Message);
_client.Socket.Close();
Thread.Sleep(5000);
Login();
return;
}
// 登录
connecter.Send(new Message<LogInRequest> { MessageType = MessageType.C_LogIn, Content = new LogInRequest { ClientConfig = _clientConfig } });
_client.Send(new Message<LogInRequest> { MessageType = MessageType.C_LogIn, Content = new LogInRequest { ClientConfig = _clientConfig } });
_logger.Debug("登录成功");
ReceiveServer(connecter.Client);
_logger.Debug("客户端退出");
// 心跳开始
timer_heart.Start();
//timer_timeout.Start();
th = new Thread(ReceiveServer);
th.Start(_client.Socket);
}
private void ReceiveServer(object obj)
@ -48,12 +146,21 @@ namespace FastTunnel.Core.Client
byte[] buffer = new byte[1024];
string lastBuffer = string.Empty;
int n;
while (true)
{
int n = client.Receive(buffer);
if (n == 0)
try
{
n = client.Receive(buffer);
if (n == 0)
{
client.Shutdown(SocketShutdown.Both);
break;
}
}
catch
{
client.Close();
break;
}
@ -100,7 +207,8 @@ namespace FastTunnel.Core.Client
Msg = JsonConvert.DeserializeObject<Message<object>>(words);
switch (Msg.MessageType)
{
case MessageType.C_Heart:
case MessageType.Heart:
lastHeart = DateTime.Now;
break;
case MessageType.S_NewCustomer:
var request = (Msg.Content as JObject).ToObject<NewCustomerRequest>();
@ -111,7 +219,7 @@ namespace FastTunnel.Core.Client
var localConnecter = new Connecter(request.WebConfig.LocalIp, request.WebConfig.LocalPort);
localConnecter.Connect();
new SocketSwap(connecter.Client, localConnecter.Client).StartSwap();
new SocketSwap(connecter.Socket, localConnecter.Socket).StartSwap();
break;
case MessageType.S_NewSSH:
var request_ssh = (Msg.Content as JObject).ToObject<NewSSHRequest>();
@ -122,10 +230,8 @@ namespace FastTunnel.Core.Client
var localConnecter_ssh = new Connecter(request_ssh.SSHConfig.LocalIp, request_ssh.SSHConfig.LocalPort);
localConnecter_ssh.Connect();
new SocketSwap(connecter_ssh.Client, localConnecter_ssh.Client).StartSwap();
new SocketSwap(connecter_ssh.Socket, localConnecter_ssh.Socket).StartSwap();
break;
case MessageType.C_SwapMsg:
case MessageType.C_LogIn:
case MessageType.Info:
var info = Msg.Content.ToJson();
_logger.Info(info);
@ -138,6 +244,8 @@ namespace FastTunnel.Core.Client
var err = Msg.Content.ToJson();
_logger.Error(err);
break;
case MessageType.C_SwapMsg:
case MessageType.C_LogIn:
default:
throw new Exception("参数异常");
}

View File

@ -13,14 +13,15 @@ namespace FastTunnel.Core
private string _ip;
private int _port;
public Socket Client { get; set; }
public Socket Socket { get; set; }
public Connecter(string v1, int v2)
{
this._ip = v1;
this._port = v2;
Client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
Socket.SendTimeout = 2000;
}
public void Connect()
@ -28,12 +29,12 @@ namespace FastTunnel.Core
IPAddress ip = IPAddress.Parse(_ip);
IPEndPoint point = new IPEndPoint(ip, _port);
Client.Connect(point);
Socket.Connect(point);
}
private void Send(string msg)
{
Client.Send(Encoding.UTF8.GetBytes(msg));
Socket.Send(Encoding.UTF8.GetBytes(msg));
}
public void Send<T>(Message<T> msg)

View File

@ -50,13 +50,10 @@ namespace FastTunnel.Core
var client = _socket.Accept();
string point = client.RemoteEndPoint.ToString();
Console.WriteLine($"收到请求 {point}");
ThreadPool.QueueUserWorkItem(ReceiveCustomer, client);
}
catch (SocketException ex)
{
_logerr.Error(ex.Message);
break;
}
catch (Exception ex)
@ -79,12 +76,15 @@ namespace FastTunnel.Core
{
try
{
ls.Shutdown(SocketShutdown.Both);
if (ls.Connected)
{
ls.Shutdown(SocketShutdown.Both);
}
}
finally
{
ls.Close();
_logerr.Debug("Listener cloed");
_logerr.Debug("Listener closed");
}
}
}

View File

@ -15,7 +15,7 @@ namespace FastTunnel.Core.Models
{
// client use below
C_LogIn,
C_Heart,
Heart,
C_SwapMsg,
// server use below

View File

@ -71,8 +71,9 @@ namespace FastTunnel.Core.Server
return;
}
}
catch (SocketException)
catch (SocketException ex)
{
_logger.Error(ex);
if (client.Connected)
client.Close();
return;
@ -142,121 +143,127 @@ namespace FastTunnel.Core.Server
byte[] buffer = new byte[1024 * 1024];
int length;
try
while (true)
{
length = client.Receive(buffer);
}
catch (Exception ex)
{
if (client.Connected)
try
{
client.Close();
length = client.Receive(buffer);
}
return;
}
// 将字节转换成字符串
string words = Encoding.UTF8.GetString(buffer, 0, length);
var msg = JsonConvert.DeserializeObject<Message<object>>(words);
_logger.Debug($"收到客户端指令:{msg.MessageType}");
switch (msg.MessageType)
{
case MessageType.C_LogIn:
var requet = (msg.Content as JObject).ToObject<LogInRequest>();
if (requet.ClientConfig.Webs != null && requet.ClientConfig.Webs.Count() > 0)
catch (Exception ex)
{
_logger.Error(ex);
if (client.Connected)
{
foreach (var item in requet.ClientConfig.Webs)
{
var hostName = $"{item.SubDomain}.{serverSettings.Domain}".Trim();
if (WebList.ContainsKey(hostName))
{
_logger.Debug($"renew domain '{hostName}'");
WebList.Remove(hostName);
WebList.Add(hostName, new WebInfo { Socket = client, WebConfig = item });
}
else
{
_logger.Debug($"new domain '{hostName}'");
WebList.Add(hostName, new WebInfo { Socket = client, WebConfig = item });
}
client.Send(new Message<string> { MessageType = MessageType.Info, Content = $"TunnelForWeb is OK: you can visit {item.LocalIp}:{item.LocalPort} from http://{hostName}:{serverSettings.ProxyPort_HTTP}" });
}
client.Close();
}
return;
}
if (requet.ClientConfig.SSH != null && requet.ClientConfig.SSH.Count() > 0)
{
foreach (var item in requet.ClientConfig.SSH)
// 将字节转换成字符串
string words = Encoding.UTF8.GetString(buffer, 0, length);
var msg = JsonConvert.DeserializeObject<Message<object>>(words);
_logger.Debug($"收到客户端指令:{msg.MessageType}");
switch (msg.MessageType)
{
case MessageType.C_LogIn:
var requet = (msg.Content as JObject).ToObject<LogInRequest>();
if (requet.ClientConfig.Webs != null && requet.ClientConfig.Webs.Count() > 0)
{
try
foreach (var item in requet.ClientConfig.Webs)
{
if (item.RemotePort.Equals(serverSettings.BindPort))
var hostName = $"{item.SubDomain}.{serverSettings.Domain}".Trim();
if (WebList.ContainsKey(hostName))
{
_logger.Error($"RemotePort can not be same with BindPort: {item.RemotePort}");
_logger.Debug($"renew domain '{hostName}'");
WebList.Remove(hostName);
WebList.Add(hostName, new WebInfo { Socket = client, WebConfig = item });
}
else
{
_logger.Debug($"new domain '{hostName}'");
WebList.Add(hostName, new WebInfo { Socket = client, WebConfig = item });
}
client.Send(new Message<string> { MessageType = MessageType.Info, Content = $"TunnelForWeb is OK: you can visit {item.LocalIp}:{item.LocalPort} from http://{hostName}:{serverSettings.ProxyPort_HTTP}" });
}
}
if (requet.ClientConfig.SSH != null && requet.ClientConfig.SSH.Count() > 0)
{
foreach (var item in requet.ClientConfig.SSH)
{
try
{
if (item.RemotePort.Equals(serverSettings.BindPort))
{
_logger.Error($"RemotePort can not be same with BindPort: {item.RemotePort}");
continue;
}
if (item.RemotePort.Equals(serverSettings.ProxyPort_HTTP))
{
_logger.Error($"RemotePort can not be same with ProxyPort_HTTP: {item.RemotePort}");
continue;
}
SSHInfo<SSHHandlerArg> old;
if (SSHList.TryGetValue(item.RemotePort, out old))
{
_logger.Debug($"Remove Listener {old.Listener.IP}:{old.Listener.Port}");
old.Listener.ShutdownAndClose();
SSHList.Remove(item.RemotePort);
}
var ls = new Listener<SSHHandlerArg>("0.0.0.0", item.RemotePort, _logger, SSHHandler, new SSHHandlerArg { LocalClient = client, SSHConfig = item });
ls.Listen();
// listen success
SSHList.Add(item.RemotePort, new SSHInfo<SSHHandlerArg> { Listener = ls, Socket = client, SSHConfig = item });
_logger.Debug($"SSH proxy success: {item.RemotePort} -> {item.LocalIp}:{item.LocalPort}");
}
catch (Exception ex)
{
_logger.Error($"SSH proxy error: {item.RemotePort} -> {item.LocalIp}:{item.LocalPort}");
_logger.Error(ex);
client.Send(new Message<string> { MessageType = MessageType.Error, Content = ex.Message });
continue;
}
if (item.RemotePort.Equals(serverSettings.ProxyPort_HTTP))
{
_logger.Error($"RemotePort can not be same with ProxyPort_HTTP: {item.RemotePort}");
continue;
}
SSHInfo<SSHHandlerArg> old;
if (SSHList.TryGetValue(item.RemotePort, out old))
{
_logger.Debug($"Remove Listener {old.Listener.IP}:{old.Listener.Port}");
old.Listener.ShutdownAndClose();
SSHList.Remove(item.RemotePort);
}
var ls = new Listener<SSHHandlerArg>("0.0.0.0", item.RemotePort, _logger, SSHHandler, new SSHHandlerArg { LocalClient = client, SSHConfig = item });
ls.Listen();
// listen success
SSHList.Add(item.RemotePort, new SSHInfo<SSHHandlerArg> { Listener = ls, Socket = client, SSHConfig = item });
_logger.Debug($"SSH proxy success: {item.RemotePort} -> {item.LocalIp}:{item.LocalPort}");
client.Send(new Message<string> { MessageType = MessageType.Info, Content = $"TunnelForSSH is OK: {requet.ClientConfig.Common.ServerAddr}:{item.RemotePort}->{item.LocalIp}:{item.LocalPort}" });
}
catch (Exception ex)
{
_logger.Error($"SSH proxy error: {item.RemotePort} -> {item.LocalIp}:{item.LocalPort}");
_logger.Error(ex);
client.Send(new Message<string> { MessageType = MessageType.Error, Content = ex.Message });
continue;
}
client.Send(new Message<string> { MessageType = MessageType.Info, Content = $"TunnelForSSH is OK: {requet.ClientConfig.Common.ServerAddr}:{item.RemotePort}->{item.LocalIp}:{item.LocalPort}" });
}
}
break;
case MessageType.C_Heart:
break;
case MessageType.C_SwapMsg:
var msgId = (msg.Content as string);
NewRequest request;
break;
case MessageType.Heart:
client.Send(new Message<string>() { MessageType = MessageType.Heart, Content = null });
break;
case MessageType.C_SwapMsg:
var msgId = (msg.Content as string);
NewRequest request;
if (!string.IsNullOrEmpty(msgId) && newRequest.TryGetValue(msgId, out request))
{
// Join
Task.Run(() =>
if (!string.IsNullOrEmpty(msgId) && newRequest.TryGetValue(msgId, out request))
{
(new SocketSwap(request.CustomerClient, client))
.BeforeSwap(() => { if (request.Buffer != null) client.Send(request.Buffer); })
.StartSwap();
});
}
else
{
// 未找到,关闭连接
_logger.Error($"未找到请求:{msgId}");
client.Send(new Message<string> { MessageType = MessageType.Error, Content = $"未找到请求:{msgId}" });
}
break;
case MessageType.S_NewCustomer:
default:
throw new Exception("参数异常");
// Join
Task.Run(() =>
{
(new SocketSwap(request.CustomerClient, client))
.BeforeSwap(() => { if (request.Buffer != null) client.Send(request.Buffer); })
.StartSwap();
});
}
else
{
// 未找到,关闭连接
_logger.Error($"未找到请求:{msgId}");
client.Send(new Message<string> { MessageType = MessageType.Error, Content = $"未找到请求:{msgId}" });
}
break;
case MessageType.S_NewCustomer:
default:
throw new Exception("参数异常");
}
}
}

View File

@ -74,7 +74,7 @@ namespace FastTunnel.Core
break;
chanel.Send.Send(result, num, SocketFlags.None);
}
catch (Exception)
catch (Exception ex)
{
if (chanel.Receive.Connected)
{

View File

@ -84,8 +84,5 @@ ssh -oPort=12701 root@x.x.x.x
- 安装 `vs2019`
- 安装 `dotnetcore runtime&sdk 3.1` 或以上版本
# Dev Plan
- 客户端心跳
# License
Apache License 2.0