Replace to jetstreams support for NATSmaster
@@ -1,7 +1,9 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using NATS.Client; | |||
using NATS.Client.JetStream; | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
@@ -15,7 +17,7 @@ namespace DotNetCore.CAP | |||
/// Gets or sets the server url/urls used to connect to the NATs server. | |||
/// </summary> | |||
/// <remarks>This may contain username/password information.</remarks> | |||
public string Servers { get; set; } | |||
public string Servers { get; set; } = "nats://localhost:4222"; | |||
/// <summary> | |||
/// connection pool size, default is 10 | |||
@@ -26,5 +28,7 @@ namespace DotNetCore.CAP | |||
/// Used to setup all NATs client options | |||
/// </summary> | |||
public Options Options { get; set; } | |||
public Action<StreamConfiguration.StreamConfigurationBuilder> StreamOptions { get; set; } | |||
} | |||
} |
@@ -3,6 +3,7 @@ | |||
using System; | |||
using DotNetCore.CAP; | |||
using JetBrains.Annotations; | |||
// ReSharper disable once CheckNamespace | |||
namespace Microsoft.Extensions.DependencyInjection | |||
@@ -14,9 +15,13 @@ namespace Microsoft.Extensions.DependencyInjection | |||
/// </summary> | |||
/// <param name="options">CAP configuration options</param> | |||
/// <param name="bootstrapServers">NATS bootstrap server urls.</param> | |||
public static CapOptions UseNATS(this CapOptions options, string bootstrapServers) | |||
public static CapOptions UseNATS(this CapOptions options, [CanBeNull] string bootstrapServers = null) | |||
{ | |||
return options.UseNATS(opt => { opt.Servers = bootstrapServers; }); | |||
return options.UseNATS(opt => | |||
{ | |||
if (bootstrapServers != null) | |||
opt.Servers = bootstrapServers; | |||
}); | |||
} | |||
/// <summary> | |||
@@ -13,7 +13,7 @@ | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="NATS.Client" Version="0.14.0-pre1" /> | |||
<PackageReference Include="NATS.Client" Version="0.14.0-pre3" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
@@ -8,6 +8,7 @@ using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
using NATS.Client; | |||
using NATS.Client.JetStream; | |||
namespace DotNetCore.CAP.NATS | |||
{ | |||
@@ -15,16 +16,19 @@ namespace DotNetCore.CAP.NATS | |||
{ | |||
private readonly IConnectionPool _connectionPool; | |||
private readonly ILogger _logger; | |||
private readonly JetStreamOptions _jetStreamOptions; | |||
public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPool) | |||
{ | |||
_logger = logger; | |||
_connectionPool = connectionPool; | |||
_jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); | |||
} | |||
public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); | |||
public Task<OperateResult> SendAsync(TransportMessage message) | |||
public async Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
var connection = _connectionPool.RentConnection(); | |||
@@ -36,21 +40,26 @@ namespace DotNetCore.CAP.NATS | |||
msg.Header[header.Key] = header.Value; | |||
} | |||
var reply = connection.Request(msg); | |||
var js = connection.CreateJetStreamContext(_jetStreamOptions); | |||
var builder = PublishOptions.Builder().WithExpectedStream(Helper.Normalized(message.GetName())).WithMessageId(message.GetId()); | |||
var resp = await js.PublishAsync(msg, builder.Build()); | |||
if (reply.Data != null && reply.Data[0] == 1) | |||
if (resp.Seq > 0) | |||
{ | |||
_logger.LogDebug($"NATS subject message [{message.GetName()}] has been consumed."); | |||
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); | |||
return Task.FromResult(OperateResult.Success); | |||
return OperateResult.Success; | |||
} | |||
throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); | |||
} | |||
catch (Exception ex) | |||
{ | |||
var warpEx = new PublisherSentFailedException(ex.Message, ex); | |||
return Task.FromResult(OperateResult.Failed(warpEx)); | |||
return OperateResult.Failed(warpEx); | |||
} | |||
finally | |||
{ | |||
@@ -3,11 +3,14 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Threading; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Options; | |||
using NATS.Client; | |||
using NATS.Client.JetStream; | |||
namespace DotNetCore.CAP.NATS | |||
{ | |||
@@ -17,14 +20,12 @@ namespace DotNetCore.CAP.NATS | |||
private readonly string _groupId; | |||
private readonly NATSOptions _natsOptions; | |||
private readonly IList<IAsyncSubscription> _asyncSubscriptions; | |||
private IConnection _consumerClient; | |||
public NATSConsumerClient(string groupId, IOptions<NATSOptions> options) | |||
{ | |||
_groupId = groupId; | |||
_asyncSubscriptions = new List<IAsyncSubscription>(); | |||
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
@@ -34,6 +35,41 @@ namespace DotNetCore.CAP.NATS | |||
public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers); | |||
public ICollection<string> FetchTopics(IEnumerable<string> topicNames) | |||
{ | |||
var jsm = _consumerClient.CreateJetStreamManagementContext(); | |||
foreach (var topic in topicNames) | |||
{ | |||
var norTopic = Helper.Normalized(topic); | |||
try | |||
{ | |||
jsm.GetStreamInfo(norTopic); // this throws if the stream does not exist | |||
} | |||
catch (NATSJetStreamException) | |||
{ | |||
var builder = StreamConfiguration.Builder() | |||
.WithName(norTopic) | |||
.WithNoAck(false) | |||
.WithStorageType(StorageType.Memory) | |||
.WithSubjects(topic); | |||
_natsOptions.StreamOptions?.Invoke(builder); | |||
try | |||
{ | |||
jsm.AddStream(builder.Build()); | |||
} | |||
catch | |||
{ | |||
// ignored | |||
} | |||
} | |||
} | |||
return topicNames.ToList(); | |||
} | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
if (topics == null) | |||
@@ -43,21 +79,29 @@ namespace DotNetCore.CAP.NATS | |||
Connect(); | |||
var js = _consumerClient.CreateJetStreamContext(); | |||
foreach (var topic in topics) | |||
{ | |||
_asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId)); | |||
var pso = PushSubscribeOptions.Builder() | |||
.WithStream(Helper.Normalized(topic)) | |||
.WithConfiguration(ConsumerConfiguration.Builder().WithDeliverPolicy(DeliverPolicy.New).Build()) | |||
.WithDeliverGroup(_groupId) | |||
.Build(); | |||
js.PushSubscribeAsync(topic, Subscription_MessageHandler, false, pso); | |||
} | |||
} | |||
public void Listening(TimeSpan timeout, CancellationToken cancellationToken) | |||
{ | |||
Connect(); | |||
//Connect(); | |||
foreach (var subscription in _asyncSubscriptions) | |||
{ | |||
subscription.MessageHandler += Subscription_MessageHandler; | |||
subscription.Start(); | |||
} | |||
//foreach (var subscription in _asyncSubscriptions) | |||
//{ | |||
// subscription.MessageHandler += Subscription_MessageHandler; | |||
// subscription.Start(); | |||
//} | |||
while (true) | |||
{ | |||
@@ -66,11 +110,11 @@ namespace DotNetCore.CAP.NATS | |||
} | |||
// ReSharper disable once FunctionNeverReturns | |||
} | |||
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) | |||
{ | |||
var headers = new Dictionary<string, string>(); | |||
foreach (string h in e.Message.Header.Keys) | |||
{ | |||
headers.Add(h, e.Message.Header[h]); | |||
@@ -78,22 +122,22 @@ namespace DotNetCore.CAP.NATS | |||
headers.Add(Headers.Group, _groupId); | |||
OnMessageReceived?.Invoke(e.Message.Reply, new TransportMessage(headers, e.Message.Data)); | |||
OnMessageReceived?.Invoke(e.Message, new TransportMessage(headers, e.Message.Data)); | |||
} | |||
public void Commit(object sender) | |||
{ | |||
if (sender is string reply) | |||
if (sender is Msg msg) | |||
{ | |||
_consumerClient.Publish(reply, new byte[] {1}); | |||
msg.Ack(); | |||
} | |||
} | |||
public void Reject(object sender) | |||
{ | |||
if (sender is string reply) | |||
if (sender is Msg msg) | |||
{ | |||
_consumerClient.Publish(reply, new byte[] {0}); | |||
msg.Nak(); | |||
} | |||
} | |||
@@ -120,6 +164,7 @@ namespace DotNetCore.CAP.NATS | |||
opts.ClosedEventHandler = ConnectedEventHandler; | |||
opts.DisconnectedEventHandler = ConnectedEventHandler; | |||
opts.AsyncErrorEventHandler = AsyncErrorEventHandler; | |||
opts.Timeout = 5000; | |||
_consumerClient = new ConnectionFactory().CreateConnection(opts); | |||
} | |||
} | |||
@@ -133,8 +178,8 @@ namespace DotNetCore.CAP.NATS | |||
{ | |||
var logArgs = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.ServerConnError, | |||
Reason = $"An error occurred during connect NATS --> {e.Error}" | |||
LogType = MqLogType.ConnectError, | |||
Reason = e.Error?.ToString() | |||
}; | |||
OnLog?.Invoke(null, logArgs); | |||
} | |||
@@ -144,7 +189,7 @@ namespace DotNetCore.CAP.NATS | |||
var logArgs = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.AsyncErrorEvent, | |||
Reason = $"An error occurred out of band --> {e.Error}" | |||
Reason = e.Error | |||
}; | |||
OnLog?.Invoke(null, logArgs); | |||
} | |||
@@ -4,6 +4,7 @@ | |||
using System; | |||
using System.ComponentModel; | |||
using System.Reflection; | |||
using System.Text.RegularExpressions; | |||
namespace DotNetCore.CAP.Internal | |||
{ | |||
@@ -64,6 +65,16 @@ namespace DotNetCore.CAP.Internal | |||
return wildcard; | |||
} | |||
public static string Normalized(string name) | |||
{ | |||
if (string.IsNullOrEmpty(name)) | |||
{ | |||
return name; | |||
} | |||
var pattern = "[\\>\\.\\ \\*]"; | |||
return Regex.IsMatch(name, pattern) ? Regex.Replace(name, pattern, "_") : name; | |||
} | |||
public static bool IsInnerIP(string ipAddress) | |||
{ | |||
var ipNum = GetIpNum(ipAddress); | |||
@@ -301,6 +301,9 @@ namespace DotNetCore.CAP.Internal | |||
case MqLogType.AsyncErrorEvent: | |||
_logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ConnectError: | |||
_logger.LogError("NATS server connection error. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.InvalidIdFormat: | |||
_logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason); | |||
break; | |||
@@ -22,6 +22,7 @@ namespace DotNetCore.CAP.Transport | |||
//NATS | |||
AsyncErrorEvent, | |||
ConnectError, | |||
//Amazon SQS | |||
InvalidIdFormat, | |||