diff --git a/Blog.Core.Api/Controllers/LoginController.cs b/Blog.Core.Api/Controllers/LoginController.cs index 686c643..e007cc6 100644 --- a/Blog.Core.Api/Controllers/LoginController.cs +++ b/Blog.Core.Api/Controllers/LoginController.cs @@ -14,6 +14,7 @@ using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; + namespace Blog.Core.Controllers { @@ -61,10 +62,11 @@ namespace Blog.Core.Controllers [Route("Token")] public async Task> GetJwtStr(string name, string pass) { + string jwtStr = string.Empty; bool suc = false; //这里就是用户登陆以后,通过数据库去调取数据,分配权限的操作 - + var user = await _sysUserInfoServices.GetUserRoleNameStr(name, MD5Helper.MD5Encrypt32(pass)); if (user != null) { diff --git a/Blog.Core.Api/appsettings.json b/Blog.Core.Api/appsettings.json index b39bce5..989b1d0 100644 --- a/Blog.Core.Api/appsettings.json +++ b/Blog.Core.Api/appsettings.json @@ -266,5 +266,15 @@ "ServiceName": "blog.Core.Api", // 服务名 "Port": "8081", // 服务端口号 "RegisterEnabled": true // 是否直接注册nacos + }, + "LogFiedOutPutConfigs": { + "tcpAddressHost": "", // 输出elk的tcp连接地址 + "tcpAddressPort": 0, // 输出elk的tcp端口号 + "ConfigsInfo": [ // 配置的输出elk节点内容 常用语动态标识 + { + "FiedName": "applicationName", + "FiedValue": "Blog.Core.Api" + } + ] } } diff --git a/Blog.Core.Common/Blog.Core.Common.csproj b/Blog.Core.Common/Blog.Core.Common.csproj index 8ada40a..5bc856c 100644 --- a/Blog.Core.Common/Blog.Core.Common.csproj +++ b/Blog.Core.Common/Blog.Core.Common.csproj @@ -32,4 +32,8 @@ + + + + diff --git a/Blog.Core.Common/Helper/JsonConfigUtils.cs b/Blog.Core.Common/Helper/JsonConfigUtils.cs index f04fff1..b49024b 100644 --- a/Blog.Core.Common/Helper/JsonConfigUtils.cs +++ b/Blog.Core.Common/Helper/JsonConfigUtils.cs @@ -101,7 +101,6 @@ namespace Blog.Core.Common.Helper } catch (Exception ex) { - Serilog.Log.Information($"配置文件管理器异常:,{ ex.ToString() }"); value = defaultValue; } diff --git a/Blog.Core.Common/LogHelper/Seri/SerilogServer.cs b/Blog.Core.Common/LogHelper/Seri/SerilogServer.cs index fcdbfd8..2c70532 100644 --- a/Blog.Core.Common/LogHelper/Seri/SerilogServer.cs +++ b/Blog.Core.Common/LogHelper/Seri/SerilogServer.cs @@ -1,7 +1,8 @@ using Blog.Core.Common.Helper; +using Blog.Core.Serilog.Es; +using Blog.Core.Serilog.Es.Formatters; using Serilog; using Serilog.Events; -using Serilog.Sinks.Elasticsearch; using System; using System.IO; @@ -18,6 +19,8 @@ namespace Blog.Core.Common.LogHelper public static void WriteLog(string filename, string[] dataParas, bool IsHeader = true, string defaultFolder = "", bool isJudgeJsonFormat = false) { Log.Logger = new LoggerConfiguration() + // TCPSink 集成Serilog 使用tcp的方式向elk 输出log日志 LogstashJsonFormatter 这个是按照自定义格式化输出内容 + .WriteTo.TCPSink(new LogstashJsonFormatter()) .MinimumLevel.Debug() .MinimumLevel.Override("Microsoft", LogEventLevel.Error) //.WriteTo.File(Path.Combine($"log/Serilog/{filename}/", ".log"), rollingInterval: RollingInterval.Day, outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff} [{Level}] {Message}{NewLine}{Exception}") @@ -54,7 +57,11 @@ namespace Blog.Core.Common.LogHelper String.Join("\r\n", dataParas) + "\r\n" ); } + // 展示elk支持输出4种日志级别 Log.Information(logContent); + Log.Warning(logContent); + Log.Error(logContent); + Log.Debug(logContent); } else { diff --git a/Blog.Core.Extensions/NacosConfig/NacosListenConfigurationTask.cs b/Blog.Core.Extensions/NacosConfig/NacosListenConfigurationTask.cs index 919cc63..42347ea 100644 --- a/Blog.Core.Extensions/NacosConfig/NacosListenConfigurationTask.cs +++ b/Blog.Core.Extensions/NacosConfig/NacosListenConfigurationTask.cs @@ -43,7 +43,7 @@ namespace Blog.Core.Extensions.NacosConfig } catch (Exception ex) { - Serilog.Log.Information($"Nacos配置文件获取异常!!! " + ex.ToString()); + } } diff --git a/Blog.Core.Extensions/NacosConfig/NacosListenNamingTask.cs b/Blog.Core.Extensions/NacosConfig/NacosListenNamingTask.cs index baca937..60657bb 100644 --- a/Blog.Core.Extensions/NacosConfig/NacosListenNamingTask.cs +++ b/Blog.Core.Extensions/NacosConfig/NacosListenNamingTask.cs @@ -88,9 +88,6 @@ namespace Blog.Core.Extensions.NacosConfig // 配置有变动后 刷新redis配置 刷新 mq配置 //_redisCachqManager.DisposeRedisConnection(); - - - Serilog.Log.Information($"收到服务变更事件!!! {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} [{e}]"); } return Task.CompletedTask; diff --git a/Blog.Core.Serilog.Es/AppSettingsFileNameConfig.cs b/Blog.Core.Serilog.Es/AppSettingsFileNameConfig.cs new file mode 100644 index 0000000..c4f2cda --- /dev/null +++ b/Blog.Core.Serilog.Es/AppSettingsFileNameConfig.cs @@ -0,0 +1,30 @@ +using System; + +namespace Blog.Core.Serilog.Es +{ + public class AppSettingsFileNameConfig + { + /// + /// 配置文件名称常量 + /// + public static string AppSettingsFileName = $"appsettings{ GetAppSettingsConfigName() }json"; + + + /// + /// 根据环境变量定向配置文件名称 + /// + /// + private static string GetAppSettingsConfigName() + { + if (Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") != null + && Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") != "") + { + return $".{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}."; + } + else + { + return "."; + } + } + } +} diff --git a/Blog.Core.Serilog.Es/Blog.Core.Serilog.Es.csproj b/Blog.Core.Serilog.Es/Blog.Core.Serilog.Es.csproj new file mode 100644 index 0000000..b5c2e0e --- /dev/null +++ b/Blog.Core.Serilog.Es/Blog.Core.Serilog.Es.csproj @@ -0,0 +1,17 @@ + + + + net5.0 + + + + + + + + + + + + + diff --git a/Blog.Core.Serilog.Es/Formatters/JsonConfigUtils.cs b/Blog.Core.Serilog.Es/Formatters/JsonConfigUtils.cs new file mode 100644 index 0000000..e942504 --- /dev/null +++ b/Blog.Core.Serilog.Es/Formatters/JsonConfigUtils.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Configuration.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Blog.Core.Serilog.Es.Formatters +{ + /// + /// Json 配置文件通用类 + /// + public static class JsonConfigUtils + { + #region 变量 + + /// + /// 锁 + /// + private static object __Lock__ = new object(); + + // 读取到的系统配置信息 + public static IConfiguration Configuration { get; set; } + + #endregion + + /// + /// 读取配置文件的信息 + /// + /// + /// 要读取json的名称 + /// 要读取的json节点名称 + /// + public static T GetAppSettings(string AppSettingsFileName, string key) where T : class, new() + { + lock (__Lock__) + { + if (Configuration == null) + { + Configuration = new ConfigurationBuilder() + .Add(new JsonConfigurationSource + { + Path = AppSettingsFileName, + Optional = false, + ReloadOnChange = true + }) + .Build(); + } + var appconfig = new ServiceCollection() + .AddOptions() + .Configure(Configuration.GetSection(key)) + .BuildServiceProvider() + .GetService>() + .Value; + + return appconfig; + } + } + + + public static string GetJson(string jsonPath, string key) + { + IConfiguration config = new ConfigurationBuilder().AddJsonFile(jsonPath).Build(); //json文件地址 + string s = config.GetSection(key).Value; //json某个对象 + return s; + } + } +} diff --git a/Blog.Core.Serilog.Es/Formatters/LogConfigRootDTO.cs b/Blog.Core.Serilog.Es/Formatters/LogConfigRootDTO.cs new file mode 100644 index 0000000..36a9916 --- /dev/null +++ b/Blog.Core.Serilog.Es/Formatters/LogConfigRootDTO.cs @@ -0,0 +1,26 @@ +using System.Collections.Generic; + +namespace Blog.Core.Serilog.Es.Formatters +{ + + public class LogConfigRootDTO + { + /// + /// tcp日志的host地址 + /// + public string tcpAddressHost { set; get; } + + /// + /// tcp日志的port地址 + /// + public int tcpAddressPort { set; get; } + + public List ConfigsInfo { get; set; } + } + + public class Configsinfo + { + public string FiedName { get; set; } + public string FiedValue { get; set; } + } +} diff --git a/Blog.Core.Serilog.Es/Formatters/LogstashJsonFormatter.cs b/Blog.Core.Serilog.Es/Formatters/LogstashJsonFormatter.cs new file mode 100644 index 0000000..f123c17 --- /dev/null +++ b/Blog.Core.Serilog.Es/Formatters/LogstashJsonFormatter.cs @@ -0,0 +1,152 @@ +// Adapted from RawJsonFormatter in Serilog.Sinks.Seq Copyright 2016 Serilog Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Serilog.Events; +using Serilog.Formatting; +using Serilog.Formatting.Json; +using Blog.Core.Serilog.Es.HttpInfo; + +namespace Blog.Core.Serilog.Es.Formatters +{ + public class LogstashJsonFormatter : ITextFormatter + { + private static readonly JsonValueFormatter ValueFormatter = new JsonValueFormatter(); + + public void Format(LogEvent logEvent, TextWriter output) + { + FormatContent(logEvent, output); + + output.WriteLine(); + } + + + /// + /// 格式化 最终输出到elk的核心部分 + /// + /// + /// + private static void FormatContent(LogEvent logEvent, TextWriter output) + { + if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); + if (output == null) throw new ArgumentNullException(nameof(output)); + + output.Write('{'); + + // 读取相关配置 + var logConfigRootDTOInfo = JsonConfigUtils.GetAppSettings(AppSettingsFileNameConfig.AppSettingsFileName, "LogFiedOutPutConfigs"); + if (logConfigRootDTOInfo == null) + { + return; + } + + // 写入所有的项目配置项的字段 在appsetting中配置的 输出elk节点的数据字段 + foreach (var item in logConfigRootDTOInfo.ConfigsInfo) + { + switch (item.FiedName) + { + //case "orgid": + // WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method); + // output.Write(","); + // break; + default: + WritePropertyAndValue(output, item.FiedName, item.FiedValue); + output.Write(","); + break; + } + } + // 写入http对应的信息数据 + if (HttpContextProvider.GetCurrent()!=null && HttpContextProvider.GetCurrent().Request!=null) + { + if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method)) + { + WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method); + output.Write(","); + } + // 输出请求页面url + if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Path)) + { + WritePropertyAndValue(output, "requestUrl", HttpContextProvider.GetCurrent().Request.Path.ToString()); + output.Write(","); + } + // 输出携带token + if (HttpContextProvider.GetCurrent().Request.Headers["Authorization"].FirstOrDefault() != null) + { + WritePropertyAndValue(output, "Authorization", HttpContextProvider.GetCurrent().Request.Headers["Authorization"].FirstOrDefault()); + output.Write(","); + } + // 输出请求参数 + if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method)) + { + string contentFromBody = ParamsHelper.GetParams(HttpContextProvider.GetCurrent()); + WritePropertyAndValue(output, "requestParam", contentFromBody); + output.Write(","); + } + // 输出请求方法类型 + if (!string.IsNullOrEmpty(HttpContextProvider.GetCurrent().Request.Method)) + { + WritePropertyAndValue(output, "method", HttpContextProvider.GetCurrent().Request.Method); + output.Write(","); + } + } + // 输出请求时间戳 + WritePropertyAndValue(output, "timestamp", logEvent.Timestamp.ToString("o")); + output.Write(","); + + // 输出日志级别 + WritePropertyAndValue(output, "level", logEvent.Level.ToString()); + output.Write(","); + + // 输出log内容 + WritePropertyAndValue(output, "executeResult", logEvent.MessageTemplate.Render(logEvent.Properties)); + + if (logEvent.Exception != null) + { + output.Write(","); + WritePropertyAndValue(output, "exception", logEvent.Exception.ToString()); + } + + WriteProperties(logEvent.Properties, output); + + output.Write('}'); + } + + private static void WritePropertyAndValue(TextWriter output, string propertyKey, string propertyValue) + { + JsonValueFormatter.WriteQuotedJsonString(propertyKey, output); + output.Write(":"); + JsonValueFormatter.WriteQuotedJsonString(propertyValue, output); + } + + private static void WriteProperties(IReadOnlyDictionary properties, TextWriter output) + { + if (properties.Any()) output.Write(","); + + var precedingDelimiter = ""; + foreach (var property in properties) + { + output.Write(precedingDelimiter); + precedingDelimiter = ","; + + var camelCasePropertyKey = property.Key[0].ToString().ToLower() + property.Key.Substring(1); + JsonValueFormatter.WriteQuotedJsonString(camelCasePropertyKey, output); + output.Write(':'); + ValueFormatter.Format(property.Value, output); + } + } + } +} \ No newline at end of file diff --git a/Blog.Core.Serilog.Es/HttpInfo/HttpContextProvider.cs b/Blog.Core.Serilog.Es/HttpInfo/HttpContextProvider.cs new file mode 100644 index 0000000..612f20b --- /dev/null +++ b/Blog.Core.Serilog.Es/HttpInfo/HttpContextProvider.cs @@ -0,0 +1,20 @@ +using Microsoft.AspNetCore.Http; + +namespace Blog.Core.Serilog.Es.HttpInfo +{ + public static class HttpContextProvider + { + private static IHttpContextAccessor _accessor; + + public static HttpContext GetCurrent() + { + var context = _accessor?.HttpContext; + return context; + } + public static void ConfigureAccessor(IHttpContextAccessor accessor) + { + _accessor = accessor; + } + } + +} diff --git a/Blog.Core.Serilog.Es/HttpInfo/ParamsHelper.cs b/Blog.Core.Serilog.Es/HttpInfo/ParamsHelper.cs new file mode 100644 index 0000000..3b39c0b --- /dev/null +++ b/Blog.Core.Serilog.Es/HttpInfo/ParamsHelper.cs @@ -0,0 +1,82 @@ +using Microsoft.AspNetCore.Http; +using System; +using System.Collections.Generic; +using System.Collections.Specialized; +using System.IO; +using System.Text; +using System.Web; + +namespace Blog.Core.Serilog.Es.HttpInfo +{ + /// + /// 获取参数帮助类 + /// + public class ParamsHelper + { + /// + /// 获取参数值 + /// + /// + /// + public static string GetParams(HttpContext context) + { + try + { + NameValueCollection form = HttpUtility.ParseQueryString(context.Request.QueryString.ToString()); + HttpRequest request = context.Request; + + string data = string.Empty; + switch (request.Method) + { + case "POST": + + request.Body.Position = 0; + using (var ms = new MemoryStream()) + { + request.Body.CopyTo(ms); + var b = ms.ToArray(); + data = Encoding.UTF8.GetString(b); //把body赋值给bodyStr + + } + break; + case "GET": + //第一步:取出所有get参数 + IDictionary parameters = new Dictionary(); + for (int f = 0; f < form.Count; f++) + { + string key = form.Keys[f]; + parameters.Add(key, form[key]); + } + + // 第二步:把字典按Key的字母顺序排序 + IDictionary sortedParams = new SortedDictionary(parameters); + IEnumerator> dem = sortedParams.GetEnumerator(); + + // 第三步:把所有参数名和参数值串在一起 + StringBuilder query = new StringBuilder(); + while (dem.MoveNext()) + { + string key = dem.Current.Key; + string value = dem.Current.Value; + if (!string.IsNullOrEmpty(key)) + { + query.Append(key).Append("=").Append(value).Append("&"); + } + } + data = query.ToString().TrimEnd('&'); + break; + default: + data = string.Empty; + + break; + } + return data; + } + catch(Exception ex) + { + return string.Empty; + } + } + + } +} diff --git a/Blog.Core.Serilog.Es/NetworkLoggerConfigurationExtensions.cs b/Blog.Core.Serilog.Es/NetworkLoggerConfigurationExtensions.cs new file mode 100644 index 0000000..d77f80f --- /dev/null +++ b/Blog.Core.Serilog.Es/NetworkLoggerConfigurationExtensions.cs @@ -0,0 +1,102 @@ +using System; +using System.Linq; +using System.Net; +using Serilog; +using Serilog.Configuration; +using Serilog.Debugging; +using Serilog.Events; +using Serilog.Formatting; +using Blog.Core.Serilog.Es.Formatters; +using Blog.Core.Serilog.Es.Sinks.TCP; + +namespace Blog.Core.Serilog.Es +{ + /// + /// Extends Serilog configuration to write events to the network. + /// + public static class NetworkLoggerConfigurationExtensions + { + private static string TcpAddressHost = ""; + private static int TcpAddressProt = 0; + /// + /// 获得tcpAddress + /// + private static void GetTcpAddress() + { + // 读取相关配置 + var logConfigRootDTOInfo = JsonConfigUtils.GetAppSettings(AppSettingsFileNameConfig.AppSettingsFileName, "LogFiedOutPutConfigs"); + if (logConfigRootDTOInfo == null) + { + return; + } + TcpAddressHost = logConfigRootDTOInfo.tcpAddressHost; + TcpAddressProt = logConfigRootDTOInfo.tcpAddressPort; + } + + public static LoggerConfiguration TCPSink( + this LoggerSinkConfiguration loggerConfiguration, + ITextFormatter textFormatter = null, + LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) + { + GetTcpAddress(); + if (!string.IsNullOrEmpty(TcpAddressHost)) + { + var sink = new TCPSink(BuildUri($"tcp://{TcpAddressHost}:{TcpAddressProt}"), textFormatter ?? new LogstashJsonFormatter()); + return loggerConfiguration.Sink(sink, restrictedToMinimumLevel); + } + else { + return new LoggerConfiguration(); + } + } + + private static IPAddress ResolveAddress(string uri) + { + // Check if it is IP address + IPAddress address; + + if (IPAddress.TryParse(uri, out address)) + return address; + + address = ResolveIP(uri); + if (address != null) + return address; + + SelfLog.WriteLine("Unable to determine the destination IP-Address"); + return IPAddress.Loopback; + } + + private static IPAddress ResolveIP(string uri) + { + try + { + var ipHostEntry = Dns.GetHostEntryAsync(uri).Result; + if (!ipHostEntry.AddressList.Any()) + return null; + return ipHostEntry.AddressList.First(); + } + catch (Exception) + { + SelfLog.WriteLine("Could not resolve " + uri); + return null; + } + } + + private static Uri BuildUri(string s) + { + Uri uri; + try + { + uri = new Uri(s); + } + catch (UriFormatException ex) + { + throw new ArgumentNullException("Uri should be in the format tcp://server:port", ex); + } + if (uri.Port == 0) + throw new UriFormatException("Uri port cannot be 0"); + if (!(uri.Scheme.ToLower() == "tcp" || uri.Scheme.ToLower() == "tls")) + throw new UriFormatException("Uri scheme must be tcp or tls"); + return uri; + } + } +} \ No newline at end of file diff --git a/Blog.Core.Serilog.Es/Sinks/TCP/TCPSink.cs b/Blog.Core.Serilog.Es/Sinks/TCP/TCPSink.cs new file mode 100644 index 0000000..fc5a43c --- /dev/null +++ b/Blog.Core.Serilog.Es/Sinks/TCP/TCPSink.cs @@ -0,0 +1,44 @@ +using System; +using System.IO; +using System.Net; +using System.Text; +using Serilog.Core; +using Serilog.Events; +using Serilog.Formatting; + +namespace Blog.Core.Serilog.Es.Sinks.TCP +{ + public class TCPSink : ILogEventSink, IDisposable + { + private readonly ITextFormatter _formatter; + private readonly TcpSocketWriter _socketWriter; + + public TCPSink(IPAddress ipAddress, int port, ITextFormatter formatter) + { + _socketWriter = new TcpSocketWriter(new Uri($"tcp://{ipAddress}:{port}")); + _formatter = formatter; + } + + public TCPSink(Uri uri, ITextFormatter formatter) + { + _socketWriter = new TcpSocketWriter(uri); + _formatter = formatter; + } + + public void Emit(LogEvent logEvent) + { + var sb = new StringBuilder(); + + using (var sw = new StringWriter(sb)) + _formatter.Format(logEvent, sw); + + sb.Replace("RenderedMessage", "message"); + _socketWriter.Enqueue(sb.ToString()); + } + + public void Dispose() + { + _socketWriter.Dispose(); + } + } +} \ No newline at end of file diff --git a/Blog.Core.Serilog.Es/Sinks/TCP/TCPSocketWriter.cs b/Blog.Core.Serilog.Es/Sinks/TCP/TCPSocketWriter.cs new file mode 100644 index 0000000..f1f592c --- /dev/null +++ b/Blog.Core.Serilog.Es/Sinks/TCP/TCPSocketWriter.cs @@ -0,0 +1,325 @@ +/* + * Copyright 2014 Splunk, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"): you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +using Serilog; +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; + +namespace Blog.Core.Serilog.Es.Sinks.TCP +{ + /// + /// TcpSocketWriter encapsulates queueing strings to be written to a TCP _socket + /// and handling reconnections (according to a TcpConnectionPolicy object passed + /// to it) when a TCP session drops. + /// + /// + /// TcpSocketWriter maintains a fixed sized queue of strings to be sent via + /// the TCP _port and, while the _socket is open, sends them as quickly as possible. + /// + /// If the TCP session drops, TcpSocketWriter will stop pulling strings off the + /// queue until it can reestablish a connection. Any SocketErrors emitted during this + /// process will be passed as arguments to invocations of LoggingFailureHandler. + /// If the TcpConnectionPolicy.Connect method throws an exception (in particular, + /// TcpReconnectFailure to indicate that the policy has reached a point where it + /// will no longer try to establish a connection) then the LoggingFailureHandler + /// event is invoked, and no further attempt to log anything will be made. + /// + public class TcpSocketWriter : IDisposable + { + private readonly FixedSizeQueue _eventQueue; + private readonly ExponentialBackoffTcpReconnectionPolicy _reconnectPolicy = new ExponentialBackoffTcpReconnectionPolicy(); + private readonly CancellationTokenSource _tokenSource; // Must be private or Dispose will not function properly. + private readonly TaskCompletionSource _disposed = new TaskCompletionSource(); + + private Stream _stream; + + /// + /// Event that is invoked when reconnecting after a TCP session is dropped fails. + /// + public event Action LoggingFailureHandler = ex => + { + UnexpectedErrorLogger( + ex, + (x, socketError) => + { + if (socketError == null) + { + //Log.Error(x, "failure inside TCP socket: {message}", x.Message); + } + else + { + //Log.Error( + // x, + // "failure inside TCP socket: {message} - socket error found {socketErrorCode}", + // x.Message, + // socketError); + } + + }); + }; + + public static void UnexpectedErrorLogger(Exception ex, Action log) + { + SocketError? socketErrorCode = null; + var current = ex; + do + { + if (current is SocketException) + { + socketErrorCode = ((SocketException) current).SocketErrorCode; + } + + current = current.InnerException; + } while (socketErrorCode == null && current != null); + + log(ex, socketErrorCode); + } + + /// + /// Construct a TCP _socket writer that writes to the given endPoint and _port. + /// + /// Uri to open a TCP socket to. + /// The maximum number of log entries to queue before starting to drop entries. + public TcpSocketWriter(Uri uri, int maxQueueSize = 5000) + { + _eventQueue = new FixedSizeQueue(maxQueueSize); + _tokenSource = new CancellationTokenSource(); + + Func> tryOpenSocket = async h => + { + try + { + TcpClient client = new TcpClient(); + await client.ConnectAsync(uri.Host, uri.Port); + Stream stream = client.GetStream(); + if (uri.Scheme.ToLower() != "tls") + return stream; + + var sslStream = new SslStream(client.GetStream(), false, null, null); + await sslStream.AuthenticateAsClientAsync(uri.Host); + return sslStream; + } + catch (Exception e) + { + LoggingFailureHandler(e); + throw; + } + }; + + var threadReady = new TaskCompletionSource(); + + Task queueListener = Task.Factory.StartNew(async () => + { + try + { + bool sslEnabled = uri.Scheme.ToLower() == "tls"; + _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); + threadReady.SetResult(true); // Signal the calling thread that we are ready. + + string entry = null; + while (_stream != null) // null indicates that the thread has been cancelled and cleaned up. + { + if (_tokenSource.Token.IsCancellationRequested) + { + _eventQueue.CompleteAdding(); + // Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle. + while (_eventQueue.Count > 0) + { + entry = _eventQueue.Dequeue(); + try + { + byte[] messsage = Encoding.UTF8.GetBytes(entry); + await _stream.WriteAsync(messsage, 0, messsage.Length); + await _stream.FlushAsync(); + } + catch (SocketException ex) + { + LoggingFailureHandler(ex); + } + } + break; + } + if (entry == null) + { + entry = _eventQueue.Dequeue(_tokenSource.Token); + } + else + { + try + { + byte[] messsage = Encoding.UTF8.GetBytes(entry); + await _stream.WriteAsync(messsage, 0, messsage.Length); + await _stream.FlushAsync(); + // No exception, it was sent + entry = null; + } + catch (IOException ex) + { + LoggingFailureHandler(ex); + _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); + } + catch (SocketException ex) + { + LoggingFailureHandler(ex); + _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); + } + } + } + } + catch (Exception e) + { + LoggingFailureHandler(e); + } + finally + { + if (_stream != null) + { + _stream.Dispose(); + } + + _disposed.SetResult(true); + } + }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); + threadReady.Task.Wait(TimeSpan.FromSeconds(5)); + } + + public void Dispose() + { + // The following operations are idempotent. Issue a cancellation to tell the + // writer thread to stop the queue from accepting entries and write what it has + // before cleaning up, then wait until that cleanup is finished. + _tokenSource.Cancel(); + Task.Run(async () => await _disposed.Task).Wait(); + } + + /// + /// Push a string onto the queue to be written. + /// + /// The string to be written to the TCP _socket. + public void Enqueue(string entry) + { + _eventQueue.Enqueue(entry); + } + } + + /// + /// TcpConnectionPolicy implementation that tries to reconnect after + /// increasingly long intervals. + /// + /// + /// The intervals double every time, starting from 0s, 1s, 2s, 4s, ... + /// until 10 minutes between connections, when it plateaus and does + /// not increase the interval length any further. + /// + public class ExponentialBackoffTcpReconnectionPolicy + { + private readonly int ceiling = 10 * 60; // 10 minutes in seconds + + public async Task ConnectAsync(Func> connect, Uri host, CancellationToken cancellationToken) + { + int delay = 1; // in seconds + while (!cancellationToken.IsCancellationRequested) + { + try + { + //Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, delay); + return await connect(host); + } + catch (SocketException) { } + + // If this is cancelled via the cancellationToken instead of + // completing its delay, the next while-loop test will fail, + // the loop will terminate, and the method will return null + // with no additional connection attempts. + await Task.Delay(delay * 1000, cancellationToken); + // The nth delay is min(10 minutes, 2^n - 1 seconds). + delay = Math.Min((delay + 1) * 2 - 1, ceiling); + } + + // cancellationToken has been cancelled. + return null; + } + } + + /// + /// A queue with a maximum size. When the queue is at its maximum size + /// and a new item is queued, the oldest item in the queue is dropped. + /// + /// + internal class FixedSizeQueue + { + private int Size { get; } + private readonly IProgress _progress = new Progress(); + private bool IsCompleted { get; set; } + + private readonly BlockingCollection _collection = new BlockingCollection(); + + public FixedSizeQueue(int size) + { + Size = size; + IsCompleted = false; + } + + public void Enqueue(T obj) + { + lock (this) + { + if (IsCompleted) + { + throw new InvalidOperationException("Tried to add an item to a completed queue."); + } + + _collection.Add(obj); + + while (_collection.Count > Size) + { + _collection.Take(); + } + _progress.Report(true); + } + } + + public void CompleteAdding() + { + lock (this) + { + IsCompleted = true; + } + } + + public T Dequeue(CancellationToken cancellationToken) + { + return _collection.Take(cancellationToken); + } + + public T Dequeue() + { + return _collection.Take(); + } + + + public decimal Count => _collection.Count; + } +} \ No newline at end of file diff --git a/Blog.Core.Serilog.Es/Sinks/UDP/UDPSink.cs b/Blog.Core.Serilog.Es/Sinks/UDP/UDPSink.cs new file mode 100644 index 0000000..f07c24e --- /dev/null +++ b/Blog.Core.Serilog.Es/Sinks/UDP/UDPSink.cs @@ -0,0 +1,42 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using Serilog.Core; +using Serilog.Events; +using Serilog.Formatting; + + +namespace Serilog.Sinks.Network.Sinks.UDP +{ + public class UDPSink : ILogEventSink, IDisposable + { + private Socket _socket = new Socket(SocketType.Dgram, ProtocolType.Udp); + private readonly ITextFormatter _formatter; + + public UDPSink(IPAddress ipAddress, int port, ITextFormatter formatter) + { + _socket.Connect(ipAddress, port); + _formatter = formatter; + } + + public void Emit(LogEvent logEvent) + { + var sb = new StringBuilder(); + + using (var sw = new StringWriter(sb)) + _formatter.Format(logEvent, sw); + + sb.Replace("RenderedMessage", "message"); + + _socket.Send(Encoding.UTF8.GetBytes(sb.ToString())); + } + + public void Dispose() + { + _socket?.Dispose(); + _socket = null; + } + } +} diff --git a/Blog.Core.sln b/Blog.Core.sln index 2446fd7..1e86d30 100644 --- a/Blog.Core.sln +++ b/Blog.Core.sln @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blog.Core.ConsoleApp", "Blo EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blog.Core.Gateway", "Blog.Core.Gateway\Blog.Core.Gateway.csproj", "{A11C0DF2-1E13-4EED-BA49-44A57136B189}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Blog.Core.Serilog.Es", "Blog.Core.Serilog.Es\Blog.Core.Serilog.Es.csproj", "{52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -111,6 +113,10 @@ Global {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Debug|Any CPU.Build.0 = Debug|Any CPU {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.ActiveCfg = Release|Any CPU {A11C0DF2-1E13-4EED-BA49-44A57136B189}.Release|Any CPU.Build.0 = Release|Any CPU + {52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {52AFAB53-D1CA-4014-8B63-3550FDCDA6E1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE