From d838f97cb262b4ce70290733ce645994367ee242 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 25 Jun 2020 23:59:24 +0800 Subject: [PATCH] WIP: Add NATS support. #595 --- CAP.sln | 7 + CAP.sln.DotSettings | 2 + .../CAP.NATSCapOptionsExtension.cs | 32 ++++ src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs | 30 ++++ .../CAP.Options.Extensions.cs | 40 +++++ .../DotNetCore.CAP.NATS.csproj | 23 +++ .../IConnectionPool.Default.cs | 83 ++++++++++ src/DotNetCore.CAP.NATS/IConnectionPool.cs | 16 ++ src/DotNetCore.CAP.NATS/ITransport.NATS.cs | 60 +++++++ src/DotNetCore.CAP.NATS/NATSConsumerClient.cs | 147 ++++++++++++++++++ .../NATSConsumerClientFactory.cs | 32 ++++ .../Internal/IConsumerRegister.Default.cs | 3 + src/DotNetCore.CAP/Transport/MqLogType.cs | 5 +- 13 files changed, 479 insertions(+), 1 deletion(-) create mode 100644 src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs create mode 100644 src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs create mode 100644 src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs create mode 100644 src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj create mode 100644 src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs create mode 100644 src/DotNetCore.CAP.NATS/IConnectionPool.cs create mode 100644 src/DotNetCore.CAP.NATS/ITransport.NATS.cs create mode 100644 src/DotNetCore.CAP.NATS/NATSConsumerClient.cs create mode 100644 src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs diff --git a/CAP.sln b/CAP.sln index b5be9a6..bc55514 100644 --- a/CAP.sln +++ b/CAP.sln @@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{25A1B3A1-DD74-436C-9956-17E04FE7643D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -147,6 +149,10 @@ Global {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU + {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -171,6 +177,7 @@ Global {93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {25A1B3A1-DD74-436C-9956-17E04FE7643D} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/CAP.sln.DotSettings b/CAP.sln.DotSettings index 3eef960..6e57e28 100644 --- a/CAP.sln.DotSettings +++ b/CAP.sln.DotSettings @@ -1,4 +1,6 @@  DB + NATS True + True True \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs new file mode 100644 index 0000000..a0bc817 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.NATS; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal sealed class NATSCapOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public NATSCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + + services.Configure(_configure); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs new file mode 100644 index 0000000..42fe2d1 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using NATS.Client; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + /// + /// Provides programmatic configuration for the CAP NATS project. + /// + public class NATSOptions + { + /// + /// Gets or sets the server url/urls used to connect to the NATs server. + /// + /// This may contain username/password information. + public string Servers { get; set; } + + /// + /// connection pool size, default is 10 + /// + public int ConnectionPoolSize { get; set; } = 10; + + /// + /// Used to setup all NATs client options + /// + public Options Options { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs new file mode 100644 index 0000000..c91b415 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs @@ -0,0 +1,40 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// NATS bootstrap server urls. + public static CapOptions UseNATS(this CapOptions options, string bootstrapServers) + { + return options.UseNATS(opt => { opt.Servers = bootstrapServers; }); + } + + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// Provides programmatic configuration for the NATS. + /// + public static CapOptions UseNATS(this CapOptions options, Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + options.RegisterExtension(new NATSCapOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj new file mode 100644 index 0000000..1a5e563 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.0 + DotNetCore.CAP.NATS + $(PackageTags);NATS + + + + NU1605;NU1701 + NU1701;CS1591 + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.NATS.xml + + + + + + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs new file mode 100644 index 0000000..ad37e61 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs @@ -0,0 +1,83 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NATS.Client; + +namespace DotNetCore.CAP.NATS +{ + public class ConnectionPool : IConnectionPool, IDisposable + { + private readonly NATSOptions _options; + private readonly ConcurrentQueue _connectionPool; + private readonly ConnectionFactory _connectionFactory; + private int _pCount; + private int _maxSize; + + public ConnectionPool(ILogger logger, IOptions options) + { + _options = options.Value; + _connectionPool = new ConcurrentQueue(); + _connectionFactory = new ConnectionFactory(); + _maxSize = _options.ConnectionPoolSize; + + logger.LogDebug("NATS configuration: {0}", options.Value.Options); + } + + public string ServersAddress => _options.Servers; + + public IConnection RentConnection() + { + if (_connectionPool.TryDequeue(out var connection)) + { + Interlocked.Decrement(ref _pCount); + + return connection; + } + + if (_options.Options != null) + { + if (_options.Servers != null) + { + _options.Options.Url = _options.Servers; + } + connection = _connectionFactory.CreateConnection(_options.Options); + } + else + { + connection = _connectionFactory.CreateConnection(_options.Servers); + } + + return connection; + } + + public bool Return(IConnection connection) + { + if (Interlocked.Increment(ref _pCount) <= _maxSize) + { + _connectionPool.Enqueue(connection); + + return true; + } + + Interlocked.Decrement(ref _pCount); + + return false; + } + + public void Dispose() + { + _maxSize = 0; + + while (_connectionPool.TryDequeue(out var context)) + { + context.Dispose(); + + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.cs new file mode 100644 index 0000000..e6d78e6 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using NATS.Client; + +namespace DotNetCore.CAP.NATS +{ + public interface IConnectionPool + { + string ServersAddress { get; } + + IConnection RentConnection(); + + bool Return(IConnection connection); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs new file mode 100644 index 0000000..78f1caf --- /dev/null +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -0,0 +1,60 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.IO; +using System.Runtime.Serialization.Formatters.Binary; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.NATS +{ + internal class NATSTransport : ITransport + { + private readonly IConnectionPool _connectionPool; + private readonly ILogger _logger; + + public NATSTransport(ILogger logger, IConnectionPool connectionPool) + { + _logger = logger; + _connectionPool = connectionPool; + } + + public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); + + public Task SendAsync(TransportMessage message) + { + var connection = _connectionPool.RentConnection(); + + try + { + var binFormatter = new BinaryFormatter(); + using var mStream = new MemoryStream(); + binFormatter.Serialize(mStream, message); + + connection.Publish(message.GetName(), mStream.ToArray()); + + _logger.LogDebug($"kafka topic message [{message.GetName()}] has been published."); + + return Task.FromResult(OperateResult.Success); + } + catch (Exception ex) + { + var warpEx = new PublisherSentFailedException(ex.Message, ex); + + return Task.FromResult(OperateResult.Failed(warpEx)); + } + finally + { + var returned = _connectionPool.Return(connection); + if (!returned) + { + connection.Dispose(); + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs new file mode 100644 index 0000000..67a1c77 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -0,0 +1,147 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.Serialization.Formatters.Binary; +using System.Threading; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Options; +using NATS.Client; + +namespace DotNetCore.CAP.NATS +{ + internal sealed class NATSConsumerClient : IConsumerClient + { + private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + + private readonly string _groupId; + private readonly NATSOptions _natsOptions; + private readonly IList _asyncSubscriptions; + + private IConnection _consumerClient; + + public NATSConsumerClient(string groupId, IOptions options) + { + _groupId = groupId; + _asyncSubscriptions = new List(); + _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers); + + public void Subscribe(IEnumerable topics) + { + if (topics == null) + { + throw new ArgumentNullException(nameof(topics)); + } + + Connect(); + + foreach (var topic in topics) + { + _asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId)); + } + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + Connect(); + + foreach (var subscription in _asyncSubscriptions) + { + subscription.MessageHandler += Subscription_MessageHandler; + subscription.Start(); + } + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + // ReSharper disable once FunctionNeverReturns + } + + private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) + { + using var mStream = new MemoryStream(); + var binFormatter = new BinaryFormatter(); + + mStream.Write(e.Message.Data, 0, e.Message.Data.Length); + mStream.Position = 0; + + var message = binFormatter.Deserialize(mStream) as TransportMessage; + + OnMessageReceived?.Invoke(sender, message); + } + + public void Commit(object sender) + { + //TODO : Only NATS Streaming Server Support + } + + public void Reject(object sender) + { + //TODO : Only NATS Streaming Server Support + } + + public void Dispose() + { + _consumerClient?.Dispose(); + } + + public void Connect() + { + if (_consumerClient != null) + { + return; + } + + ConnectionLock.Wait(); + + try + { + if (_consumerClient == null) + { + var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); + opts.Url = _natsOptions.Servers ?? opts.Url; + opts.ClosedEventHandler = ConnectedEventHandler; + opts.DisconnectedEventHandler = ConnectedEventHandler; + opts.AsyncErrorEventHandler = AsyncErrorEventHandler; + _consumerClient = new ConnectionFactory().CreateConnection(opts); + } + } + finally + { + ConnectionLock.Release(); + } + } + + private void ConnectedEventHandler(object sender, ConnEventArgs e) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ServerConnError, + Reason = $"An error occurred during connect NATS --> {e.Error}" + }; + OnLog?.Invoke(null, logArgs); + } + + private void AsyncErrorEventHandler(object sender, ErrEventArgs e) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.AsyncErrorEvent, + Reason = $"An error occurred out of band --> {e.Error}" + }; + OnLog?.Invoke(null, logArgs); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs new file mode 100644 index 0000000..f5fe5d8 --- /dev/null +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.NATS +{ + internal sealed class NATSConsumerClientFactory : IConsumerClientFactory + { + private readonly IOptions _natsOptions; + + public NATSConsumerClientFactory(IOptions natsOptions) + { + _natsOptions = natsOptions; + } + + public IConsumerClient Create(string groupId) + { + try + { + var client = new NATSConsumerClient(groupId, _natsOptions); + client.Connect(); + return client; + } + catch (System.Exception e) + { + throw new BrokerConnectionException(e); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 7e32a68..352adb3 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -271,6 +271,9 @@ namespace DotNetCore.CAP.Internal case MqLogType.ExceptionReceived: _logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason); break; + case MqLogType.AsyncErrorEvent: + _logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason); + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index 3412b3a..754e044 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -18,7 +18,10 @@ namespace DotNetCore.CAP.Transport ServerConnError, //AzureServiceBus - ExceptionReceived + ExceptionReceived, + + //NATS + AsyncErrorEvent } public class LogMessageEventArgs : EventArgs