diff --git a/CAP.sln b/CAP.sln
index b5be9a6..bc55514 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{25A1B3A1-DD74-436C-9956-17E04FE7643D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -147,6 +149,10 @@ Global
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU
+ {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -171,6 +177,7 @@ Global
{93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
+ {25A1B3A1-DD74-436C-9956-17E04FE7643D} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
diff --git a/CAP.sln.DotSettings b/CAP.sln.DotSettings
index 3eef960..6e57e28 100644
--- a/CAP.sln.DotSettings
+++ b/CAP.sln.DotSettings
@@ -1,4 +1,6 @@
DB
+ NATS
True
+ True
True
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs
new file mode 100644
index 0000000..a0bc817
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs
@@ -0,0 +1,32 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using DotNetCore.CAP.NATS;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.DependencyInjection;
+
+// ReSharper disable once CheckNamespace
+namespace DotNetCore.CAP
+{
+ internal sealed class NATSCapOptionsExtension : ICapOptionsExtension
+ {
+ private readonly Action _configure;
+
+ public NATSCapOptionsExtension(Action configure)
+ {
+ _configure = configure;
+ }
+
+ public void AddServices(IServiceCollection services)
+ {
+ services.AddSingleton();
+
+ services.Configure(_configure);
+
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
new file mode 100644
index 0000000..42fe2d1
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
@@ -0,0 +1,30 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using NATS.Client;
+
+// ReSharper disable once CheckNamespace
+namespace DotNetCore.CAP
+{
+ ///
+ /// Provides programmatic configuration for the CAP NATS project.
+ ///
+ public class NATSOptions
+ {
+ ///
+ /// Gets or sets the server url/urls used to connect to the NATs server.
+ ///
+ /// This may contain username/password information.
+ public string Servers { get; set; }
+
+ ///
+ /// connection pool size, default is 10
+ ///
+ public int ConnectionPoolSize { get; set; } = 10;
+
+ ///
+ /// Used to setup all NATs client options
+ ///
+ public Options Options { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
new file mode 100644
index 0000000..c91b415
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
@@ -0,0 +1,40 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using DotNetCore.CAP;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class CapOptionsExtensions
+ {
+ ///
+ /// Configuration to use NATS in CAP.
+ ///
+ /// CAP configuration options
+ /// NATS bootstrap server urls.
+ public static CapOptions UseNATS(this CapOptions options, string bootstrapServers)
+ {
+ return options.UseNATS(opt => { opt.Servers = bootstrapServers; });
+ }
+
+ ///
+ /// Configuration to use NATS in CAP.
+ ///
+ /// CAP configuration options
+ /// Provides programmatic configuration for the NATS.
+ ///
+ public static CapOptions UseNATS(this CapOptions options, Action configure)
+ {
+ if (configure == null)
+ {
+ throw new ArgumentNullException(nameof(configure));
+ }
+
+ options.RegisterExtension(new NATSCapOptionsExtension(configure));
+
+ return options;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
new file mode 100644
index 0000000..1a5e563
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
@@ -0,0 +1,23 @@
+
+
+
+ netstandard2.0
+ DotNetCore.CAP.NATS
+ $(PackageTags);NATS
+
+
+
+ NU1605;NU1701
+ NU1701;CS1591
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.NATS.xml
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs
new file mode 100644
index 0000000..ad37e61
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs
@@ -0,0 +1,83 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using NATS.Client;
+
+namespace DotNetCore.CAP.NATS
+{
+ public class ConnectionPool : IConnectionPool, IDisposable
+ {
+ private readonly NATSOptions _options;
+ private readonly ConcurrentQueue _connectionPool;
+ private readonly ConnectionFactory _connectionFactory;
+ private int _pCount;
+ private int _maxSize;
+
+ public ConnectionPool(ILogger logger, IOptions options)
+ {
+ _options = options.Value;
+ _connectionPool = new ConcurrentQueue();
+ _connectionFactory = new ConnectionFactory();
+ _maxSize = _options.ConnectionPoolSize;
+
+ logger.LogDebug("NATS configuration: {0}", options.Value.Options);
+ }
+
+ public string ServersAddress => _options.Servers;
+
+ public IConnection RentConnection()
+ {
+ if (_connectionPool.TryDequeue(out var connection))
+ {
+ Interlocked.Decrement(ref _pCount);
+
+ return connection;
+ }
+
+ if (_options.Options != null)
+ {
+ if (_options.Servers != null)
+ {
+ _options.Options.Url = _options.Servers;
+ }
+ connection = _connectionFactory.CreateConnection(_options.Options);
+ }
+ else
+ {
+ connection = _connectionFactory.CreateConnection(_options.Servers);
+ }
+
+ return connection;
+ }
+
+ public bool Return(IConnection connection)
+ {
+ if (Interlocked.Increment(ref _pCount) <= _maxSize)
+ {
+ _connectionPool.Enqueue(connection);
+
+ return true;
+ }
+
+ Interlocked.Decrement(ref _pCount);
+
+ return false;
+ }
+
+ public void Dispose()
+ {
+ _maxSize = 0;
+
+ while (_connectionPool.TryDequeue(out var context))
+ {
+ context.Dispose();
+
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.cs
new file mode 100644
index 0000000..e6d78e6
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/IConnectionPool.cs
@@ -0,0 +1,16 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using NATS.Client;
+
+namespace DotNetCore.CAP.NATS
+{
+ public interface IConnectionPool
+ {
+ string ServersAddress { get; }
+
+ IConnection RentConnection();
+
+ bool Return(IConnection connection);
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs
new file mode 100644
index 0000000..78f1caf
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs
@@ -0,0 +1,60 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.IO;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.Threading.Tasks;
+using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Logging;
+
+namespace DotNetCore.CAP.NATS
+{
+ internal class NATSTransport : ITransport
+ {
+ private readonly IConnectionPool _connectionPool;
+ private readonly ILogger _logger;
+
+ public NATSTransport(ILogger logger, IConnectionPool connectionPool)
+ {
+ _logger = logger;
+ _connectionPool = connectionPool;
+ }
+
+ public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress);
+
+ public Task SendAsync(TransportMessage message)
+ {
+ var connection = _connectionPool.RentConnection();
+
+ try
+ {
+ var binFormatter = new BinaryFormatter();
+ using var mStream = new MemoryStream();
+ binFormatter.Serialize(mStream, message);
+
+ connection.Publish(message.GetName(), mStream.ToArray());
+
+ _logger.LogDebug($"kafka topic message [{message.GetName()}] has been published.");
+
+ return Task.FromResult(OperateResult.Success);
+ }
+ catch (Exception ex)
+ {
+ var warpEx = new PublisherSentFailedException(ex.Message, ex);
+
+ return Task.FromResult(OperateResult.Failed(warpEx));
+ }
+ finally
+ {
+ var returned = _connectionPool.Return(connection);
+ if (!returned)
+ {
+ connection.Dispose();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
new file mode 100644
index 0000000..67a1c77
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
@@ -0,0 +1,147 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.Threading;
+using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Options;
+using NATS.Client;
+
+namespace DotNetCore.CAP.NATS
+{
+ internal sealed class NATSConsumerClient : IConsumerClient
+ {
+ private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
+ private readonly string _groupId;
+ private readonly NATSOptions _natsOptions;
+ private readonly IList _asyncSubscriptions;
+
+ private IConnection _consumerClient;
+
+ public NATSConsumerClient(string groupId, IOptions options)
+ {
+ _groupId = groupId;
+ _asyncSubscriptions = new List();
+ _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
+ }
+
+ public event EventHandler OnMessageReceived;
+
+ public event EventHandler OnLog;
+
+ public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers);
+
+ public void Subscribe(IEnumerable topics)
+ {
+ if (topics == null)
+ {
+ throw new ArgumentNullException(nameof(topics));
+ }
+
+ Connect();
+
+ foreach (var topic in topics)
+ {
+ _asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId));
+ }
+ }
+
+ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
+ {
+ Connect();
+
+ foreach (var subscription in _asyncSubscriptions)
+ {
+ subscription.MessageHandler += Subscription_MessageHandler;
+ subscription.Start();
+ }
+
+ while (true)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ cancellationToken.WaitHandle.WaitOne(timeout);
+ }
+ // ReSharper disable once FunctionNeverReturns
+ }
+
+ private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
+ {
+ using var mStream = new MemoryStream();
+ var binFormatter = new BinaryFormatter();
+
+ mStream.Write(e.Message.Data, 0, e.Message.Data.Length);
+ mStream.Position = 0;
+
+ var message = binFormatter.Deserialize(mStream) as TransportMessage;
+
+ OnMessageReceived?.Invoke(sender, message);
+ }
+
+ public void Commit(object sender)
+ {
+ //TODO : Only NATS Streaming Server Support
+ }
+
+ public void Reject(object sender)
+ {
+ //TODO : Only NATS Streaming Server Support
+ }
+
+ public void Dispose()
+ {
+ _consumerClient?.Dispose();
+ }
+
+ public void Connect()
+ {
+ if (_consumerClient != null)
+ {
+ return;
+ }
+
+ ConnectionLock.Wait();
+
+ try
+ {
+ if (_consumerClient == null)
+ {
+ var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions();
+ opts.Url = _natsOptions.Servers ?? opts.Url;
+ opts.ClosedEventHandler = ConnectedEventHandler;
+ opts.DisconnectedEventHandler = ConnectedEventHandler;
+ opts.AsyncErrorEventHandler = AsyncErrorEventHandler;
+ _consumerClient = new ConnectionFactory().CreateConnection(opts);
+ }
+ }
+ finally
+ {
+ ConnectionLock.Release();
+ }
+ }
+
+ private void ConnectedEventHandler(object sender, ConnEventArgs e)
+ {
+ var logArgs = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ServerConnError,
+ Reason = $"An error occurred during connect NATS --> {e.Error}"
+ };
+ OnLog?.Invoke(null, logArgs);
+ }
+
+ private void AsyncErrorEventHandler(object sender, ErrEventArgs e)
+ {
+ var logArgs = new LogMessageEventArgs
+ {
+ LogType = MqLogType.AsyncErrorEvent,
+ Reason = $"An error occurred out of band --> {e.Error}"
+ };
+ OnLog?.Invoke(null, logArgs);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs
new file mode 100644
index 0000000..f5fe5d8
--- /dev/null
+++ b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs
@@ -0,0 +1,32 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Options;
+
+namespace DotNetCore.CAP.NATS
+{
+ internal sealed class NATSConsumerClientFactory : IConsumerClientFactory
+ {
+ private readonly IOptions _natsOptions;
+
+ public NATSConsumerClientFactory(IOptions natsOptions)
+ {
+ _natsOptions = natsOptions;
+ }
+
+ public IConsumerClient Create(string groupId)
+ {
+ try
+ {
+ var client = new NATSConsumerClient(groupId, _natsOptions);
+ client.Connect();
+ return client;
+ }
+ catch (System.Exception e)
+ {
+ throw new BrokerConnectionException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
index 7e32a68..352adb3 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
@@ -271,6 +271,9 @@ namespace DotNetCore.CAP.Internal
case MqLogType.ExceptionReceived:
_logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason);
break;
+ case MqLogType.AsyncErrorEvent:
+ _logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason);
+ break;
default:
throw new ArgumentOutOfRangeException();
}
diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs
index 3412b3a..754e044 100644
--- a/src/DotNetCore.CAP/Transport/MqLogType.cs
+++ b/src/DotNetCore.CAP/Transport/MqLogType.cs
@@ -18,7 +18,10 @@ namespace DotNetCore.CAP.Transport
ServerConnError,
//AzureServiceBus
- ExceptionReceived
+ ExceptionReceived,
+
+ //NATS
+ AsyncErrorEvent
}
public class LogMessageEventArgs : EventArgs