|
@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.NATS |
|
|
private readonly string _groupId; |
|
|
private readonly string _groupId; |
|
|
private readonly NATSOptions _natsOptions; |
|
|
private readonly NATSOptions _natsOptions; |
|
|
|
|
|
|
|
|
private IConnection _consumerClient; |
|
|
|
|
|
|
|
|
private IConnection? _consumerClient; |
|
|
|
|
|
|
|
|
public NATSConsumerClient(string groupId, IOptions<NATSOptions> options) |
|
|
public NATSConsumerClient(string groupId, IOptions<NATSOptions> options) |
|
|
{ |
|
|
{ |
|
@@ -29,15 +29,17 @@ namespace DotNetCore.CAP.NATS |
|
|
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); |
|
|
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public event EventHandler<TransportMessage> OnMessageReceived; |
|
|
|
|
|
|
|
|
public event EventHandler<TransportMessage>? OnMessageReceived; |
|
|
|
|
|
|
|
|
public event EventHandler<LogMessageEventArgs> OnLog; |
|
|
|
|
|
|
|
|
public event EventHandler<LogMessageEventArgs>? OnLog; |
|
|
|
|
|
|
|
|
public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers); |
|
|
|
|
|
|
|
|
public BrokerAddress BrokerAddress => new ("NATS", _natsOptions.Servers); |
|
|
|
|
|
|
|
|
public ICollection<string> FetchTopics(IEnumerable<string> topicNames) |
|
|
public ICollection<string> FetchTopics(IEnumerable<string> topicNames) |
|
|
{ |
|
|
{ |
|
|
var jsm = _consumerClient.CreateJetStreamManagementContext(); |
|
|
|
|
|
|
|
|
Connect(); |
|
|
|
|
|
|
|
|
|
|
|
var jsm = _consumerClient!.CreateJetStreamManagementContext(); |
|
|
|
|
|
|
|
|
var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); |
|
|
var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); |
|
|
|
|
|
|
|
@@ -80,7 +82,7 @@ namespace DotNetCore.CAP.NATS |
|
|
|
|
|
|
|
|
Connect(); |
|
|
Connect(); |
|
|
|
|
|
|
|
|
var js = _consumerClient.CreateJetStreamContext(); |
|
|
|
|
|
|
|
|
var js = _consumerClient!.CreateJetStreamContext(); |
|
|
|
|
|
|
|
|
foreach (var subject in topics) |
|
|
foreach (var subject in topics) |
|
|
{ |
|
|
{ |
|
@@ -106,7 +108,7 @@ namespace DotNetCore.CAP.NATS |
|
|
|
|
|
|
|
|
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) |
|
|
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) |
|
|
{ |
|
|
{ |
|
|
var headers = new Dictionary<string, string>(); |
|
|
|
|
|
|
|
|
var headers = new Dictionary<string, string?>(); |
|
|
|
|
|
|
|
|
foreach (string h in e.Message.Header.Keys) |
|
|
foreach (string h in e.Message.Header.Keys) |
|
|
{ |
|
|
{ |
|
@@ -126,7 +128,7 @@ namespace DotNetCore.CAP.NATS |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void Reject(object sender) |
|
|
|
|
|
|
|
|
public void Reject(object? sender) |
|
|
{ |
|
|
{ |
|
|
if (sender is Msg msg) |
|
|
if (sender is Msg msg) |
|
|
{ |
|
|
{ |
|
@@ -153,7 +155,7 @@ namespace DotNetCore.CAP.NATS |
|
|
if (_consumerClient == null) |
|
|
if (_consumerClient == null) |
|
|
{ |
|
|
{ |
|
|
var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); |
|
|
var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); |
|
|
opts.Url = _natsOptions.Servers ?? opts.Url; |
|
|
|
|
|
|
|
|
opts.Url ??= _natsOptions.Servers; |
|
|
opts.ClosedEventHandler = ConnectedEventHandler; |
|
|
opts.ClosedEventHandler = ConnectedEventHandler; |
|
|
opts.DisconnectedEventHandler = ConnectedEventHandler; |
|
|
opts.DisconnectedEventHandler = ConnectedEventHandler; |
|
|
opts.AsyncErrorEventHandler = AsyncErrorEventHandler; |
|
|
opts.AsyncErrorEventHandler = AsyncErrorEventHandler; |
|
|