Browse Source

Add support jetstreams for NATS. (#983)

master
Savorboard 3 years ago
parent
commit
83b95af9bf
8 changed files with 107 additions and 29 deletions
  1. +5
    -1
      src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
  2. +7
    -2
      src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
  3. +1
    -1
      src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
  4. +15
    -6
      src/DotNetCore.CAP.NATS/ITransport.NATS.cs
  5. +64
    -19
      src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
  6. +11
    -0
      src/DotNetCore.CAP/Internal/Helper.cs
  7. +3
    -0
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  8. +1
    -0
      src/DotNetCore.CAP/Transport/MqLogType.cs

+ 5
- 1
src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs View File

@@ -1,7 +1,9 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using NATS.Client;
using NATS.Client.JetStream;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -15,7 +17,7 @@ namespace DotNetCore.CAP
/// Gets or sets the server url/urls used to connect to the NATs server.
/// </summary>
/// <remarks>This may contain username/password information.</remarks>
public string Servers { get; set; }
public string Servers { get; set; } = "nats://localhost:4222";

/// <summary>
/// connection pool size, default is 10
@@ -26,5 +28,7 @@ namespace DotNetCore.CAP
/// Used to setup all NATs client options
/// </summary>
public Options Options { get; set; }

public Action<StreamConfiguration.StreamConfigurationBuilder> StreamOptions { get; set; }
}
}

+ 7
- 2
src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs View File

@@ -3,6 +3,7 @@

using System;
using DotNetCore.CAP;
using JetBrains.Annotations;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
@@ -14,9 +15,13 @@ namespace Microsoft.Extensions.DependencyInjection
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">NATS bootstrap server urls.</param>
public static CapOptions UseNATS(this CapOptions options, string bootstrapServers)
public static CapOptions UseNATS(this CapOptions options, [CanBeNull] string bootstrapServers = null)
{
return options.UseNATS(opt => { opt.Servers = bootstrapServers; });
return options.UseNATS(opt =>
{
if (bootstrapServers != null)
opt.Servers = bootstrapServers;
});
}

/// <summary>


+ 1
- 1
src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj View File

@@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.14.0-pre1" />
<PackageReference Include="NATS.Client" Version="0.14.0-pre3" />
</ItemGroup>

<ItemGroup>


+ 15
- 6
src/DotNetCore.CAP.NATS/ITransport.NATS.cs View File

@@ -8,6 +8,7 @@ using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using NATS.Client;
using NATS.Client.JetStream;

namespace DotNetCore.CAP.NATS
{
@@ -15,16 +16,19 @@ namespace DotNetCore.CAP.NATS
{
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;
private readonly JetStreamOptions _jetStreamOptions;

public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPool)
{
_logger = logger;
_connectionPool = connectionPool;

_jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build();
}

public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress);

public Task<OperateResult> SendAsync(TransportMessage message)
public async Task<OperateResult> SendAsync(TransportMessage message)
{
var connection = _connectionPool.RentConnection();

@@ -36,21 +40,26 @@ namespace DotNetCore.CAP.NATS
msg.Header[header.Key] = header.Value;
}

var reply = connection.Request(msg);
var js = connection.CreateJetStreamContext(_jetStreamOptions);

var builder = PublishOptions.Builder().WithExpectedStream(Helper.Normalized(message.GetName())).WithMessageId(message.GetId());

var resp = await js.PublishAsync(msg, builder.Build());

if (reply.Data != null && reply.Data[0] == 1)
if (resp.Seq > 0)
{
_logger.LogDebug($"NATS subject message [{message.GetName()}] has been consumed.");
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");

return Task.FromResult(OperateResult.Success);
return OperateResult.Success;
}
throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
}
catch (Exception ex)
{
var warpEx = new PublisherSentFailedException(ex.Message, ex);

return Task.FromResult(OperateResult.Failed(warpEx));
return OperateResult.Failed(warpEx);
}
finally
{


+ 64
- 19
src/DotNetCore.CAP.NATS/NATSConsumerClient.cs View File

@@ -3,11 +3,14 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
using NATS.Client;
using NATS.Client.JetStream;

namespace DotNetCore.CAP.NATS
{
@@ -17,14 +20,12 @@ namespace DotNetCore.CAP.NATS

private readonly string _groupId;
private readonly NATSOptions _natsOptions;
private readonly IList<IAsyncSubscription> _asyncSubscriptions;

private IConnection _consumerClient;

public NATSConsumerClient(string groupId, IOptions<NATSOptions> options)
{
_groupId = groupId;
_asyncSubscriptions = new List<IAsyncSubscription>();
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}

@@ -34,6 +35,41 @@ namespace DotNetCore.CAP.NATS

public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers);

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
var jsm = _consumerClient.CreateJetStreamManagementContext();

foreach (var topic in topicNames)
{
var norTopic = Helper.Normalized(topic);
try
{
jsm.GetStreamInfo(norTopic); // this throws if the stream does not exist
}
catch (NATSJetStreamException)
{
var builder = StreamConfiguration.Builder()
.WithName(norTopic)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(topic);

_natsOptions.StreamOptions?.Invoke(builder);

try
{
jsm.AddStream(builder.Build());
}
catch
{
// ignored
}
}
}

return topicNames.ToList();
}

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
@@ -43,21 +79,29 @@ namespace DotNetCore.CAP.NATS

Connect();

var js = _consumerClient.CreateJetStreamContext();

foreach (var topic in topics)
{
_asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId));
var pso = PushSubscribeOptions.Builder()
.WithStream(Helper.Normalized(topic))
.WithConfiguration(ConsumerConfiguration.Builder().WithDeliverPolicy(DeliverPolicy.New).Build())
.WithDeliverGroup(_groupId)
.Build();

js.PushSubscribeAsync(topic, Subscription_MessageHandler, false, pso);
}
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();
//Connect();

foreach (var subscription in _asyncSubscriptions)
{
subscription.MessageHandler += Subscription_MessageHandler;
subscription.Start();
}
//foreach (var subscription in _asyncSubscriptions)
//{
// subscription.MessageHandler += Subscription_MessageHandler;
// subscription.Start();
//}

while (true)
{
@@ -66,11 +110,11 @@ namespace DotNetCore.CAP.NATS
}
// ReSharper disable once FunctionNeverReturns
}
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
{
var headers = new Dictionary<string, string>();
foreach (string h in e.Message.Header.Keys)
{
headers.Add(h, e.Message.Header[h]);
@@ -78,22 +122,22 @@ namespace DotNetCore.CAP.NATS

headers.Add(Headers.Group, _groupId);

OnMessageReceived?.Invoke(e.Message.Reply, new TransportMessage(headers, e.Message.Data));
OnMessageReceived?.Invoke(e.Message, new TransportMessage(headers, e.Message.Data));
}

public void Commit(object sender)
{
if (sender is string reply)
if (sender is Msg msg)
{
_consumerClient.Publish(reply, new byte[] {1});
msg.Ack();
}
}

public void Reject(object sender)
{
if (sender is string reply)
if (sender is Msg msg)
{
_consumerClient.Publish(reply, new byte[] {0});
msg.Nak();
}
}

@@ -120,6 +164,7 @@ namespace DotNetCore.CAP.NATS
opts.ClosedEventHandler = ConnectedEventHandler;
opts.DisconnectedEventHandler = ConnectedEventHandler;
opts.AsyncErrorEventHandler = AsyncErrorEventHandler;
opts.Timeout = 5000;
_consumerClient = new ConnectionFactory().CreateConnection(opts);
}
}
@@ -133,8 +178,8 @@ namespace DotNetCore.CAP.NATS
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ServerConnError,
Reason = $"An error occurred during connect NATS --> {e.Error}"
LogType = MqLogType.ConnectError,
Reason = e.Error?.ToString()
};
OnLog?.Invoke(null, logArgs);
}
@@ -144,7 +189,7 @@ namespace DotNetCore.CAP.NATS
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.AsyncErrorEvent,
Reason = $"An error occurred out of band --> {e.Error}"
Reason = e.Error
};
OnLog?.Invoke(null, logArgs);
}


+ 11
- 0
src/DotNetCore.CAP/Internal/Helper.cs View File

@@ -4,6 +4,7 @@
using System;
using System.ComponentModel;
using System.Reflection;
using System.Text.RegularExpressions;

namespace DotNetCore.CAP.Internal
{
@@ -64,6 +65,16 @@ namespace DotNetCore.CAP.Internal
return wildcard;
}

public static string Normalized(string name)
{
if (string.IsNullOrEmpty(name))
{
return name;
}
var pattern = "[\\>\\.\\ \\*]";
return Regex.IsMatch(name, pattern) ? Regex.Replace(name, pattern, "_") : name;
}

public static bool IsInnerIP(string ipAddress)
{
var ipNum = GetIpNum(ipAddress);


+ 3
- 0
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -301,6 +301,9 @@ namespace DotNetCore.CAP.Internal
case MqLogType.AsyncErrorEvent:
_logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason);
break;
case MqLogType.ConnectError:
_logger.LogError("NATS server connection error. --> " + logmsg.Reason);
break;
case MqLogType.InvalidIdFormat:
_logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason);
break;


+ 1
- 0
src/DotNetCore.CAP/Transport/MqLogType.cs View File

@@ -22,6 +22,7 @@ namespace DotNetCore.CAP.Transport

//NATS
AsyncErrorEvent,
ConnectError,

//Amazon SQS
InvalidIdFormat,


Loading…
Cancel
Save