增加 Serilog 使用tcp的方式自定义格式化 写入elk的实现

by  田雨  github  www5255977
This commit is contained in:
tianyu 2021-08-03 16:21:35 +08:00
parent d9d6431a4b
commit 93d5f31d30
19 changed files with 938 additions and 7 deletions

View File

@ -14,6 +14,7 @@ using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
namespace Blog.Core.Controllers namespace Blog.Core.Controllers
{ {
@ -61,10 +62,11 @@ namespace Blog.Core.Controllers
[Route("Token")] [Route("Token")]
public async Task<MessageModel<string>> GetJwtStr(string name, string pass) public async Task<MessageModel<string>> GetJwtStr(string name, string pass)
{ {
string jwtStr = string.Empty; string jwtStr = string.Empty;
bool suc = false; bool suc = false;
//这里就是用户登陆以后,通过数据库去调取数据,分配权限的操作 //这里就是用户登陆以后,通过数据库去调取数据,分配权限的操作
var user = await _sysUserInfoServices.GetUserRoleNameStr(name, MD5Helper.MD5Encrypt32(pass)); var user = await _sysUserInfoServices.GetUserRoleNameStr(name, MD5Helper.MD5Encrypt32(pass));
if (user != null) if (user != null)
{ {

View File

@ -266,5 +266,15 @@
"ServiceName": "blog.Core.Api", // "ServiceName": "blog.Core.Api", //
"Port": "8081", // "Port": "8081", //
"RegisterEnabled": true // nacos "RegisterEnabled": true // nacos
},
"LogFiedOutPutConfigs": {
"tcpAddressHost": "", // elktcp
"tcpAddressPort": 0, // elktcp
"ConfigsInfo": [ // elk
{
"FiedName": "applicationName",
"FiedValue": "Blog.Core.Api"
}
]
} }
} }

View File

@ -32,4 +32,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Blog.Core.Serilog.Es\Blog.Core.Serilog.Es.csproj" />
</ItemGroup>
</Project> </Project>

View File

@ -101,7 +101,6 @@ namespace Blog.Core.Common.Helper
} }
catch (Exception ex) catch (Exception ex)
{ {
Serilog.Log.Information($"配置文件管理器异常:,{ ex.ToString() }");
value = defaultValue; value = defaultValue;
} }

View File

@ -1,7 +1,8 @@
using Blog.Core.Common.Helper; using Blog.Core.Common.Helper;
using Blog.Core.Serilog.Es;
using Blog.Core.Serilog.Es.Formatters;
using Serilog; using Serilog;
using Serilog.Events; using Serilog.Events;
using Serilog.Sinks.Elasticsearch;
using System; using System;
using System.IO; using System.IO;
@ -18,6 +19,8 @@ namespace Blog.Core.Common.LogHelper
public static void WriteLog(string filename, string[] dataParas, bool IsHeader = true, string defaultFolder = "", bool isJudgeJsonFormat = false) public static void WriteLog(string filename, string[] dataParas, bool IsHeader = true, string defaultFolder = "", bool isJudgeJsonFormat = false)
{ {
Log.Logger = new LoggerConfiguration() Log.Logger = new LoggerConfiguration()
// TCPSink 集成Serilog 使用tcp的方式向elk 输出log日志 LogstashJsonFormatter 这个是按照自定义格式化输出内容
.WriteTo.TCPSink(new LogstashJsonFormatter())
.MinimumLevel.Debug() .MinimumLevel.Debug()
.MinimumLevel.Override("Microsoft", LogEventLevel.Error) .MinimumLevel.Override("Microsoft", LogEventLevel.Error)
//.WriteTo.File(Path.Combine($"log/Serilog/{filename}/", ".log"), rollingInterval: RollingInterval.Day, outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff} [{Level}] {Message}{NewLine}{Exception}") //.WriteTo.File(Path.Combine($"log/Serilog/{filename}/", ".log"), rollingInterval: RollingInterval.Day, outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff} [{Level}] {Message}{NewLine}{Exception}")
@ -54,7 +57,11 @@ namespace Blog.Core.Common.LogHelper
String.Join("\r\n", dataParas) + "\r\n" String.Join("\r\n", dataParas) + "\r\n"
); );
} }
// 展示elk支持输出4种日志级别
Log.Information(logContent); Log.Information(logContent);
Log.Warning(logContent);
Log.Error(logContent);
Log.Debug(logContent);
} }
else else
{ {

View File

@ -43,7 +43,7 @@ namespace Blog.Core.Extensions.NacosConfig
} }
catch (Exception ex) catch (Exception ex)
{ {
Serilog.Log.Information($"Nacos配置文件获取异常!!! " + ex.ToString());
} }
} }

View File

@ -88,9 +88,6 @@ namespace Blog.Core.Extensions.NacosConfig
// 配置有变动后 刷新redis配置 刷新 mq配置 // 配置有变动后 刷新redis配置 刷新 mq配置
//_redisCachqManager.DisposeRedisConnection(); //_redisCachqManager.DisposeRedisConnection();
Serilog.Log.Information($"收到服务变更事件!!! {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} [{e}]");
} }
return Task.CompletedTask; return Task.CompletedTask;

View File

@ -0,0 +1,30 @@
using System;
namespace Blog.Core.Serilog.Es
{
public class AppSettingsFileNameConfig
{
/// <summary>
/// 配置文件名称常量
/// </summary>
public static string AppSettingsFileName = $"appsettings{ GetAppSettingsConfigName() }json";
/// <summary>
/// 根据环境变量定向配置文件名称
/// </summary>
/// <returns></returns>
private static string GetAppSettingsConfigName()
{
if (Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") != null
&& Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") != "")
{
return $".{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.";
}
else
{
return ".";
}
}
}
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="5.0.0" />
<PackageReference Include="Serilog" Version="2.10.0" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,66 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Blog.Core.Serilog.Es.Formatters
{
/// <summary>
/// Json 配置文件通用类
/// </summary>
public static class JsonConfigUtils
{
#region
/// <summary>
/// 锁
/// </summary>
private static object __Lock__ = new object();
// 读取到的系统配置信息
public static IConfiguration Configuration { get; set; }
#endregion
/// <summary>
/// 读取配置文件的信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="fileName">要读取json的名称</param>
/// <param name="key">要读取的json节点名称</param>
/// <returns></returns>
public static T GetAppSettings<T>(string AppSettingsFileName, string key) where T : class, new()
{
lock (__Lock__)
{
if (Configuration == null)
{
Configuration = new ConfigurationBuilder()
.Add(new JsonConfigurationSource
{
Path = AppSettingsFileName,
Optional = false,
ReloadOnChange = true
})
.Build();
}
var appconfig = new ServiceCollection()
.AddOptions()
.Configure<T>(Configuration.GetSection(key))
.BuildServiceProvider()
.GetService<IOptions<T>>()
.Value;
return appconfig;
}
}
public static string GetJson(string jsonPath, string key)
{
IConfiguration config = new ConfigurationBuilder().AddJsonFile(jsonPath).Build(); //json文件地址
string s = config.GetSection(key).Value; //json某个对象
return s;
}
}
}

View File

@ -0,0 +1,26 @@
using System.Collections.Generic;
namespace Blog.Core.Serilog.Es.Formatters
{
public class LogConfigRootDTO
{
/// <summary>
/// tcp日志的host地址
/// </summary>
public string tcpAddressHost { set; get; }
/// <summary>
/// tcp日志的port地址
/// </summary>
public int tcpAddressPort { set; get; }
public List<Configsinfo> ConfigsInfo { get; set; }
}
public class Configsinfo
{
public string FiedName { get; set; }
public string FiedValue { get; set; }
}
}

View File

@ -0,0 +1,152 @@
// Adapted from RawJsonFormatter in Serilog.Sinks.Seq Copyright 2016 Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Serilog.Events;
using Serilog.Formatting;
using Serilog.Formatting.Json;
using Blog.Core.Serilog.Es.HttpInfo;
namespace Blog.Core.Serilog.Es.Formatters
{
public class LogstashJsonFormatter : ITextFormatter
{
private static readonly JsonValueFormatter ValueFormatter = new JsonValueFormatter();
public void Format(LogEvent logEvent, TextWriter output)
{
FormatContent(logEvent, output);
output.WriteLine();
}
/// <summary>
/// 格式化 最终输出到elk的核心部分
/// </summary>
/// <param name="logEvent"></param>
/// <param name="output"></param>
private static void FormatContent(LogEvent logEvent, TextWriter output)
{
if (logEvent == null) throw new ArgumentNullException(nameof(logEvent));
if (output == null) throw new ArgumentNullException(nameof(output));
output.Write('{');
// 读取相关配置
var logConfigRootDTOInfo = JsonConfigUtils.GetAppSettings<LogConfigRootDTO>(AppSettingsFileNameConfig.AppSettingsFileName, "LogFiedOutPutConfigs");
if (logConfigRootDTOInfo == null)
{
return;
}
// 写入所有的项目配置项的字段 在appsetting中配置的 输出elk节点的数据字段
foreach (var item in logConfigRootDTOInfo.ConfigsInfo)
{
switch (item.FiedName)
{
//case "orgid":
// WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method);
// output.Write(",");
// break;
default:
WritePropertyAndValue(output, item.FiedName, item.FiedValue);
output.Write(",");
break;
}
}
// 写入http对应的信息数据
if (HttpContextProvider.GetCurrent()!=null && HttpContextProvider.GetCurrent().Request!=null)
{
if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method))
{
WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method);
output.Write(",");
}
// 输出请求页面url
if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Path))
{
WritePropertyAndValue(output, "requestUrl", HttpContextProvider.GetCurrent().Request.Path.ToString());
output.Write(",");
}
// 输出携带token
if (HttpContextProvider.GetCurrent().Request.Headers["Authorization"].FirstOrDefault() != null)
{
WritePropertyAndValue(output, "Authorization", HttpContextProvider.GetCurrent().Request.Headers["Authorization"].FirstOrDefault());
output.Write(",");
}
// 输出请求参数
if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method))
{
string contentFromBody = ParamsHelper.GetParams(HttpContextProvider.GetCurrent());
WritePropertyAndValue(output, "requestParam", contentFromBody);
output.Write(",");
}
// 输出请求方法类型
if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method))
{
WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method);
output.Write(",");
}
}
// 输出请求时间戳
WritePropertyAndValue(output, "timestamp", logEvent.Timestamp.ToString("o"));
output.Write(",");
// 输出日志级别
WritePropertyAndValue(output, "level", logEvent.Level.ToString());
output.Write(",");
// 输出log内容
WritePropertyAndValue(output, "executeResult", logEvent.MessageTemplate.Render(logEvent.Properties));
if (logEvent.Exception != null)
{
output.Write(",");
WritePropertyAndValue(output, "exception", logEvent.Exception.ToString());
}
WriteProperties(logEvent.Properties, output);
output.Write('}');
}
private static void WritePropertyAndValue(TextWriter output, string propertyKey, string propertyValue)
{
JsonValueFormatter.WriteQuotedJsonString(propertyKey, output);
output.Write(":");
JsonValueFormatter.WriteQuotedJsonString(propertyValue, output);
}
private static void WriteProperties(IReadOnlyDictionary<string, LogEventPropertyValue> properties, TextWriter output)
{
if (properties.Any()) output.Write(",");
var precedingDelimiter = "";
foreach (var property in properties)
{
output.Write(precedingDelimiter);
precedingDelimiter = ",";
var camelCasePropertyKey = property.Key[0].ToString().ToLower() + property.Key.Substring(1);
JsonValueFormatter.WriteQuotedJsonString(camelCasePropertyKey, output);
output.Write(':');
ValueFormatter.Format(property.Value, output);
}
}
}
}

View File

@ -0,0 +1,20 @@
using Microsoft.AspNetCore.Http;
namespace Blog.Core.Serilog.Es.HttpInfo
{
public static class HttpContextProvider
{
private static IHttpContextAccessor _accessor;
public static HttpContext GetCurrent()
{
var context = _accessor?.HttpContext;
return context;
}
public static void ConfigureAccessor(IHttpContextAccessor accessor)
{
_accessor = accessor;
}
}
}

View File

@ -0,0 +1,82 @@
using Microsoft.AspNetCore.Http;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.IO;
using System.Text;
using System.Web;
namespace Blog.Core.Serilog.Es.HttpInfo
{
/// <summary>
/// 获取参数帮助类
/// </summary>
public class ParamsHelper
{
/// <summary>
/// 获取参数值
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public static string GetParams(HttpContext context)
{
try
{
NameValueCollection form = HttpUtility.ParseQueryString(context.Request.QueryString.ToString());
HttpRequest request = context.Request;
string data = string.Empty;
switch (request.Method)
{
case "POST":
request.Body.Position = 0;
using (var ms = new MemoryStream())
{
request.Body.CopyTo(ms);
var b = ms.ToArray();
data = Encoding.UTF8.GetString(b); //把body赋值给bodyStr
}
break;
case "GET":
//第一步取出所有get参数
IDictionary<string, string> parameters = new Dictionary<string, string>();
for (int f = 0; f < form.Count; f++)
{
string key = form.Keys[f];
parameters.Add(key, form[key]);
}
// 第二步把字典按Key的字母顺序排序
IDictionary<string, string> sortedParams = new SortedDictionary<string, string>(parameters);
IEnumerator<KeyValuePair<string, string>> dem = sortedParams.GetEnumerator();
// 第三步:把所有参数名和参数值串在一起
StringBuilder query = new StringBuilder();
while (dem.MoveNext())
{
string key = dem.Current.Key;
string value = dem.Current.Value;
if (!string.IsNullOrEmpty(key))
{
query.Append(key).Append("=").Append(value).Append("&");
}
}
data = query.ToString().TrimEnd('&');
break;
default:
data = string.Empty;
break;
}
return data;
}
catch(Exception ex)
{
return string.Empty;
}
}
}
}

View File

@ -0,0 +1,102 @@
using System;
using System.Linq;
using System.Net;
using Serilog;
using Serilog.Configuration;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Formatting;
using Blog.Core.Serilog.Es.Formatters;
using Blog.Core.Serilog.Es.Sinks.TCP;
namespace Blog.Core.Serilog.Es
{
/// <summary>
/// Extends Serilog configuration to write events to the network.
/// </summary>
public static class NetworkLoggerConfigurationExtensions
{
private static string TcpAddressHost = "";
private static int TcpAddressProt = 0;
/// <summary>
/// 获得tcpAddress
/// </summary>
private static void GetTcpAddress()
{
// 读取相关配置
var logConfigRootDTOInfo = JsonConfigUtils.GetAppSettings<LogConfigRootDTO>(AppSettingsFileNameConfig.AppSettingsFileName, "LogFiedOutPutConfigs");
if (logConfigRootDTOInfo == null)
{
return;
}
TcpAddressHost = logConfigRootDTOInfo.tcpAddressHost;
TcpAddressProt = logConfigRootDTOInfo.tcpAddressPort;
}
public static LoggerConfiguration TCPSink(
this LoggerSinkConfiguration loggerConfiguration,
ITextFormatter textFormatter = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum)
{
GetTcpAddress();
if (!string.IsNullOrEmpty(TcpAddressHost))
{
var sink = new TCPSink(BuildUri($"tcp://{TcpAddressHost}:{TcpAddressProt}"), textFormatter ?? new LogstashJsonFormatter());
return loggerConfiguration.Sink(sink, restrictedToMinimumLevel);
}
else {
return new LoggerConfiguration();
}
}
private static IPAddress ResolveAddress(string uri)
{
// Check if it is IP address
IPAddress address;
if (IPAddress.TryParse(uri, out address))
return address;
address = ResolveIP(uri);
if (address != null)
return address;
SelfLog.WriteLine("Unable to determine the destination IP-Address");
return IPAddress.Loopback;
}
private static IPAddress ResolveIP(string uri)
{
try
{
var ipHostEntry = Dns.GetHostEntryAsync(uri).Result;
if (!ipHostEntry.AddressList.Any())
return null;
return ipHostEntry.AddressList.First();
}
catch (Exception)
{
SelfLog.WriteLine("Could not resolve " + uri);
return null;
}
}
private static Uri BuildUri(string s)
{
Uri uri;
try
{
uri = new Uri(s);
}
catch (UriFormatException ex)
{
throw new ArgumentNullException("Uri should be in the format tcp://server:port", ex);
}
if (uri.Port == 0)
throw new UriFormatException("Uri port cannot be 0");
if (!(uri.Scheme.ToLower() == "tcp" || uri.Scheme.ToLower() == "tls"))
throw new UriFormatException("Uri scheme must be tcp or tls");
return uri;
}
}
}

View File

@ -0,0 +1,44 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using Serilog.Core;
using Serilog.Events;
using Serilog.Formatting;
namespace Blog.Core.Serilog.Es.Sinks.TCP
{
public class TCPSink : ILogEventSink, IDisposable
{
private readonly ITextFormatter _formatter;
private readonly TcpSocketWriter _socketWriter;
public TCPSink(IPAddress ipAddress, int port, ITextFormatter formatter)
{
_socketWriter = new TcpSocketWriter(new Uri($"tcp://{ipAddress}:{port}"));
_formatter = formatter;
}
public TCPSink(Uri uri, ITextFormatter formatter)
{
_socketWriter = new TcpSocketWriter(uri);
_formatter = formatter;
}
public void Emit(LogEvent logEvent)
{
var sb = new StringBuilder();
using (var sw = new StringWriter(sb))
_formatter.Format(logEvent, sw);
sb.Replace("RenderedMessage", "message");
_socketWriter.Enqueue(sb.ToString());
}
public void Dispose()
{
_socketWriter.Dispose();
}
}
}

View File

@ -0,0 +1,325 @@
/*
* Copyright 2014 Splunk, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"): you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
using Serilog;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace Blog.Core.Serilog.Es.Sinks.TCP
{
/// <summary>
/// TcpSocketWriter encapsulates queueing strings to be written to a TCP _socket
/// and handling reconnections (according to a TcpConnectionPolicy object passed
/// to it) when a TCP session drops.
/// </summary>
/// <remarks>
/// TcpSocketWriter maintains a fixed sized queue of strings to be sent via
/// the TCP _port and, while the _socket is open, sends them as quickly as possible.
///
/// If the TCP session drops, TcpSocketWriter will stop pulling strings off the
/// queue until it can reestablish a connection. Any SocketErrors emitted during this
/// process will be passed as arguments to invocations of LoggingFailureHandler.
/// If the TcpConnectionPolicy.Connect method throws an exception (in particular,
/// TcpReconnectFailure to indicate that the policy has reached a point where it
/// will no longer try to establish a connection) then the LoggingFailureHandler
/// event is invoked, and no further attempt to log anything will be made.
/// </remarks>
public class TcpSocketWriter : IDisposable
{
private readonly FixedSizeQueue<string> _eventQueue;
private readonly ExponentialBackoffTcpReconnectionPolicy _reconnectPolicy = new ExponentialBackoffTcpReconnectionPolicy();
private readonly CancellationTokenSource _tokenSource; // Must be private or Dispose will not function properly.
private readonly TaskCompletionSource<bool> _disposed = new TaskCompletionSource<bool>();
private Stream _stream;
/// <summary>
/// Event that is invoked when reconnecting after a TCP session is dropped fails.
/// </summary>
public event Action<Exception> LoggingFailureHandler = ex =>
{
UnexpectedErrorLogger(
ex,
(x, socketError) =>
{
if (socketError == null)
{
//Log.Error(x, "failure inside TCP socket: {message}", x.Message);
}
else
{
//Log.Error(
// x,
// "failure inside TCP socket: {message} - socket error found {socketErrorCode}",
// x.Message,
// socketError);
}
});
};
public static void UnexpectedErrorLogger(Exception ex, Action<Exception, SocketError?> log)
{
SocketError? socketErrorCode = null;
var current = ex;
do
{
if (current is SocketException)
{
socketErrorCode = ((SocketException) current).SocketErrorCode;
}
current = current.InnerException;
} while (socketErrorCode == null && current != null);
log(ex, socketErrorCode);
}
/// <summary>
/// Construct a TCP _socket writer that writes to the given endPoint and _port.
/// </summary>
/// <param name="uri">Uri to open a TCP socket to.</param>
/// <param name="maxQueueSize">The maximum number of log entries to queue before starting to drop entries.</param>
public TcpSocketWriter(Uri uri, int maxQueueSize = 5000)
{
_eventQueue = new FixedSizeQueue<string>(maxQueueSize);
_tokenSource = new CancellationTokenSource();
Func<Uri, Task<Stream>> tryOpenSocket = async h =>
{
try
{
TcpClient client = new TcpClient();
await client.ConnectAsync(uri.Host, uri.Port);
Stream stream = client.GetStream();
if (uri.Scheme.ToLower() != "tls")
return stream;
var sslStream = new SslStream(client.GetStream(), false, null, null);
await sslStream.AuthenticateAsClientAsync(uri.Host);
return sslStream;
}
catch (Exception e)
{
LoggingFailureHandler(e);
throw;
}
};
var threadReady = new TaskCompletionSource<bool>();
Task queueListener = Task.Factory.StartNew(async () =>
{
try
{
bool sslEnabled = uri.Scheme.ToLower() == "tls";
_stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
threadReady.SetResult(true); // Signal the calling thread that we are ready.
string entry = null;
while (_stream != null) // null indicates that the thread has been cancelled and cleaned up.
{
if (_tokenSource.Token.IsCancellationRequested)
{
_eventQueue.CompleteAdding();
// Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle.
while (_eventQueue.Count > 0)
{
entry = _eventQueue.Dequeue();
try
{
byte[] messsage = Encoding.UTF8.GetBytes(entry);
await _stream.WriteAsync(messsage, 0, messsage.Length);
await _stream.FlushAsync();
}
catch (SocketException ex)
{
LoggingFailureHandler(ex);
}
}
break;
}
if (entry == null)
{
entry = _eventQueue.Dequeue(_tokenSource.Token);
}
else
{
try
{
byte[] messsage = Encoding.UTF8.GetBytes(entry);
await _stream.WriteAsync(messsage, 0, messsage.Length);
await _stream.FlushAsync();
// No exception, it was sent
entry = null;
}
catch (IOException ex)
{
LoggingFailureHandler(ex);
_stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
}
catch (SocketException ex)
{
LoggingFailureHandler(ex);
_stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
}
}
}
}
catch (Exception e)
{
LoggingFailureHandler(e);
}
finally
{
if (_stream != null)
{
_stream.Dispose();
}
_disposed.SetResult(true);
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
threadReady.Task.Wait(TimeSpan.FromSeconds(5));
}
public void Dispose()
{
// The following operations are idempotent. Issue a cancellation to tell the
// writer thread to stop the queue from accepting entries and write what it has
// before cleaning up, then wait until that cleanup is finished.
_tokenSource.Cancel();
Task.Run(async () => await _disposed.Task).Wait();
}
/// <summary>
/// Push a string onto the queue to be written.
/// </summary>
/// <param name="entry">The string to be written to the TCP _socket.</param>
public void Enqueue(string entry)
{
_eventQueue.Enqueue(entry);
}
}
/// <summary>
/// TcpConnectionPolicy implementation that tries to reconnect after
/// increasingly long intervals.
/// </summary>
/// <remarks>
/// The intervals double every time, starting from 0s, 1s, 2s, 4s, ...
/// until 10 minutes between connections, when it plateaus and does
/// not increase the interval length any further.
/// </remarks>
public class ExponentialBackoffTcpReconnectionPolicy
{
private readonly int ceiling = 10 * 60; // 10 minutes in seconds
public async Task<Stream> ConnectAsync(Func<Uri, Task<Stream>> connect, Uri host, CancellationToken cancellationToken)
{
int delay = 1; // in seconds
while (!cancellationToken.IsCancellationRequested)
{
try
{
//Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, delay);
return await connect(host);
}
catch (SocketException) { }
// If this is cancelled via the cancellationToken instead of
// completing its delay, the next while-loop test will fail,
// the loop will terminate, and the method will return null
// with no additional connection attempts.
await Task.Delay(delay * 1000, cancellationToken);
// The nth delay is min(10 minutes, 2^n - 1 seconds).
delay = Math.Min((delay + 1) * 2 - 1, ceiling);
}
// cancellationToken has been cancelled.
return null;
}
}
/// <summary>
/// A queue with a maximum size. When the queue is at its maximum size
/// and a new item is queued, the oldest item in the queue is dropped.
/// </summary>
/// <typeparam name="T"></typeparam>
internal class FixedSizeQueue<T>
{
private int Size { get; }
private readonly IProgress<bool> _progress = new Progress<bool>();
private bool IsCompleted { get; set; }
private readonly BlockingCollection<T> _collection = new BlockingCollection<T>();
public FixedSizeQueue(int size)
{
Size = size;
IsCompleted = false;
}
public void Enqueue(T obj)
{
lock (this)
{
if (IsCompleted)
{
throw new InvalidOperationException("Tried to add an item to a completed queue.");
}
_collection.Add(obj);
while (_collection.Count > Size)
{
_collection.Take();
}
_progress.Report(true);
}
}
public void CompleteAdding()
{
lock (this)
{
IsCompleted = true;
}
}
public T Dequeue(CancellationToken cancellationToken)
{
return _collection.Take(cancellationToken);
}
public T Dequeue()
{
return _collection.Take();
}
public decimal Count => _collection.Count;
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using Serilog.Core;
using Serilog.Events;
using Serilog.Formatting;
namespace Serilog.Sinks.Network.Sinks.UDP
{
public class UDPSink : ILogEventSink, IDisposable
{
private Socket _socket = new Socket(SocketType.Dgram, ProtocolType.Udp);
private readonly ITextFormatter _formatter;
public UDPSink(IPAddress ipAddress, int port, ITextFormatter formatter)
{
_socket.Connect(ipAddress, port);
_formatter = formatter;
}
public void Emit(LogEvent logEvent)
{
var sb = new StringBuilder();
using (var sw = new StringWriter(sb))
_formatter.Format(logEvent, sw);
sb.Replace("RenderedMessage", "message");
_socket.Send(Encoding.UTF8.GetBytes(sb.ToString()));
}
public void Dispose()
{
_socket?.Dispose();
_socket = null;
}
}
}

View File

@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blog.Core.ConsoleApp", "Blo
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blog.Core.Gateway", "Blog.Core.Gateway\Blog.Core.Gateway.csproj", "{A11C0DF2-1E13-4EED-BA49-44A57136B189}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blog.Core.Gateway", "Blog.Core.Gateway\Blog.Core.Gateway.csproj", "{A11C0DF2-1E13-4EED-BA49-44A57136B189}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Blog.Core.Serilog.Es", "Blog.Core.Serilog.Es\Blog.Core.Serilog.Es.csproj", "{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -111,6 +113,10 @@ Global
{A11C0DF2-1E13-4EED-BA49-44A57136B189}.Debug|Any CPU.Build.0 = Debug|Any CPU {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.ActiveCfg = Release|Any CPU {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.Build.0 = Release|Any CPU {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.Build.0 = Release|Any CPU
{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE