From 83b95af9bf492d4c100135d50e4847070dc487b6 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Mon, 27 Sep 2021 17:46:25 +0800 Subject: [PATCH] Add support jetstreams for NATS. (#983) --- src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs | 6 +- .../CAP.Options.Extensions.cs | 9 +- .../DotNetCore.CAP.NATS.csproj | 2 +- src/DotNetCore.CAP.NATS/ITransport.NATS.cs | 21 +++-- src/DotNetCore.CAP.NATS/NATSConsumerClient.cs | 83 ++++++++++++++----- src/DotNetCore.CAP/Internal/Helper.cs | 11 +++ .../Internal/IConsumerRegister.Default.cs | 3 + src/DotNetCore.CAP/Transport/MqLogType.cs | 1 + 8 files changed, 107 insertions(+), 29 deletions(-) diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs index 42fe2d1..94dc96b 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -1,7 +1,9 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using NATS.Client; +using NATS.Client.JetStream; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP @@ -15,7 +17,7 @@ namespace DotNetCore.CAP /// Gets or sets the server url/urls used to connect to the NATs server. /// /// This may contain username/password information. - public string Servers { get; set; } + public string Servers { get; set; } = "nats://localhost:4222"; /// /// connection pool size, default is 10 @@ -26,5 +28,7 @@ namespace DotNetCore.CAP /// Used to setup all NATs client options /// public Options Options { get; set; } + + public Action StreamOptions { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs index c91b415..743aafd 100644 --- a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs @@ -3,6 +3,7 @@ using System; using DotNetCore.CAP; +using JetBrains.Annotations; // ReSharper disable once CheckNamespace namespace Microsoft.Extensions.DependencyInjection @@ -14,9 +15,13 @@ namespace Microsoft.Extensions.DependencyInjection /// /// CAP configuration options /// NATS bootstrap server urls. - public static CapOptions UseNATS(this CapOptions options, string bootstrapServers) + public static CapOptions UseNATS(this CapOptions options, [CanBeNull] string bootstrapServers = null) { - return options.UseNATS(opt => { opt.Servers = bootstrapServers; }); + return options.UseNATS(opt => + { + if (bootstrapServers != null) + opt.Servers = bootstrapServers; + }); } /// diff --git a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj index 7d6a5c5..cfe0572 100644 --- a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj +++ b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs index 35a63fc..e52119d 100644 --- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -8,6 +8,7 @@ using DotNetCore.CAP.Messages; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; using NATS.Client; +using NATS.Client.JetStream; namespace DotNetCore.CAP.NATS { @@ -15,16 +16,19 @@ namespace DotNetCore.CAP.NATS { private readonly IConnectionPool _connectionPool; private readonly ILogger _logger; + private readonly JetStreamOptions _jetStreamOptions; public NATSTransport(ILogger logger, IConnectionPool connectionPool) { _logger = logger; _connectionPool = connectionPool; + + _jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); } public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); - public Task SendAsync(TransportMessage message) + public async Task SendAsync(TransportMessage message) { var connection = _connectionPool.RentConnection(); @@ -36,21 +40,26 @@ namespace DotNetCore.CAP.NATS msg.Header[header.Key] = header.Value; } - var reply = connection.Request(msg); + var js = connection.CreateJetStreamContext(_jetStreamOptions); + + var builder = PublishOptions.Builder().WithExpectedStream(Helper.Normalized(message.GetName())).WithMessageId(message.GetId()); + + var resp = await js.PublishAsync(msg, builder.Build()); - if (reply.Data != null && reply.Data[0] == 1) + if (resp.Seq > 0) { - _logger.LogDebug($"NATS subject message [{message.GetName()}] has been consumed."); + _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); - return Task.FromResult(OperateResult.Success); + return OperateResult.Success; } + throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); } catch (Exception ex) { var warpEx = new PublisherSentFailedException(ex.Message, ex); - return Task.FromResult(OperateResult.Failed(warpEx)); + return OperateResult.Failed(warpEx); } finally { diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index b805a58..e9ac546 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -3,11 +3,14 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Options; using NATS.Client; +using NATS.Client.JetStream; namespace DotNetCore.CAP.NATS { @@ -17,14 +20,12 @@ namespace DotNetCore.CAP.NATS private readonly string _groupId; private readonly NATSOptions _natsOptions; - private readonly IList _asyncSubscriptions; private IConnection _consumerClient; public NATSConsumerClient(string groupId, IOptions options) { _groupId = groupId; - _asyncSubscriptions = new List(); _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } @@ -34,6 +35,41 @@ namespace DotNetCore.CAP.NATS public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers); + public ICollection FetchTopics(IEnumerable topicNames) + { + var jsm = _consumerClient.CreateJetStreamManagementContext(); + + foreach (var topic in topicNames) + { + var norTopic = Helper.Normalized(topic); + try + { + jsm.GetStreamInfo(norTopic); // this throws if the stream does not exist + } + catch (NATSJetStreamException) + { + var builder = StreamConfiguration.Builder() + .WithName(norTopic) + .WithNoAck(false) + .WithStorageType(StorageType.Memory) + .WithSubjects(topic); + + _natsOptions.StreamOptions?.Invoke(builder); + + try + { + jsm.AddStream(builder.Build()); + } + catch + { + // ignored + } + } + } + + return topicNames.ToList(); + } + public void Subscribe(IEnumerable topics) { if (topics == null) @@ -43,21 +79,29 @@ namespace DotNetCore.CAP.NATS Connect(); + var js = _consumerClient.CreateJetStreamContext(); + foreach (var topic in topics) { - _asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId)); + var pso = PushSubscribeOptions.Builder() + .WithStream(Helper.Normalized(topic)) + .WithConfiguration(ConsumerConfiguration.Builder().WithDeliverPolicy(DeliverPolicy.New).Build()) + .WithDeliverGroup(_groupId) + .Build(); + + js.PushSubscribeAsync(topic, Subscription_MessageHandler, false, pso); } } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { - Connect(); + //Connect(); - foreach (var subscription in _asyncSubscriptions) - { - subscription.MessageHandler += Subscription_MessageHandler; - subscription.Start(); - } + //foreach (var subscription in _asyncSubscriptions) + //{ + // subscription.MessageHandler += Subscription_MessageHandler; + // subscription.Start(); + //} while (true) { @@ -66,11 +110,11 @@ namespace DotNetCore.CAP.NATS } // ReSharper disable once FunctionNeverReturns } - + private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) { var headers = new Dictionary(); - + foreach (string h in e.Message.Header.Keys) { headers.Add(h, e.Message.Header[h]); @@ -78,22 +122,22 @@ namespace DotNetCore.CAP.NATS headers.Add(Headers.Group, _groupId); - OnMessageReceived?.Invoke(e.Message.Reply, new TransportMessage(headers, e.Message.Data)); + OnMessageReceived?.Invoke(e.Message, new TransportMessage(headers, e.Message.Data)); } public void Commit(object sender) { - if (sender is string reply) + if (sender is Msg msg) { - _consumerClient.Publish(reply, new byte[] {1}); + msg.Ack(); } } public void Reject(object sender) { - if (sender is string reply) + if (sender is Msg msg) { - _consumerClient.Publish(reply, new byte[] {0}); + msg.Nak(); } } @@ -120,6 +164,7 @@ namespace DotNetCore.CAP.NATS opts.ClosedEventHandler = ConnectedEventHandler; opts.DisconnectedEventHandler = ConnectedEventHandler; opts.AsyncErrorEventHandler = AsyncErrorEventHandler; + opts.Timeout = 5000; _consumerClient = new ConnectionFactory().CreateConnection(opts); } } @@ -133,8 +178,8 @@ namespace DotNetCore.CAP.NATS { var logArgs = new LogMessageEventArgs { - LogType = MqLogType.ServerConnError, - Reason = $"An error occurred during connect NATS --> {e.Error}" + LogType = MqLogType.ConnectError, + Reason = e.Error?.ToString() }; OnLog?.Invoke(null, logArgs); } @@ -144,7 +189,7 @@ namespace DotNetCore.CAP.NATS var logArgs = new LogMessageEventArgs { LogType = MqLogType.AsyncErrorEvent, - Reason = $"An error occurred out of band --> {e.Error}" + Reason = e.Error }; OnLog?.Invoke(null, logArgs); } diff --git a/src/DotNetCore.CAP/Internal/Helper.cs b/src/DotNetCore.CAP/Internal/Helper.cs index 2b3d64a..2f0a3ed 100644 --- a/src/DotNetCore.CAP/Internal/Helper.cs +++ b/src/DotNetCore.CAP/Internal/Helper.cs @@ -4,6 +4,7 @@ using System; using System.ComponentModel; using System.Reflection; +using System.Text.RegularExpressions; namespace DotNetCore.CAP.Internal { @@ -64,6 +65,16 @@ namespace DotNetCore.CAP.Internal return wildcard; } + public static string Normalized(string name) + { + if (string.IsNullOrEmpty(name)) + { + return name; + } + var pattern = "[\\>\\.\\ \\*]"; + return Regex.IsMatch(name, pattern) ? Regex.Replace(name, pattern, "_") : name; + } + public static bool IsInnerIP(string ipAddress) { var ipNum = GetIpNum(ipAddress); diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 6a9ed9b..ebd7265 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -301,6 +301,9 @@ namespace DotNetCore.CAP.Internal case MqLogType.AsyncErrorEvent: _logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason); break; + case MqLogType.ConnectError: + _logger.LogError("NATS server connection error. --> " + logmsg.Reason); + break; case MqLogType.InvalidIdFormat: _logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason); break; diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index 91bdcdf..6d95a73 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -22,6 +22,7 @@ namespace DotNetCore.CAP.Transport //NATS AsyncErrorEvent, + ConnectError, //Amazon SQS InvalidIdFormat,