|
|
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
private readonly string _subscriptionName; |
|
|
|
private readonly AzureServiceBusOptions _asbOptions; |
|
|
|
|
|
|
|
private SubscriptionClient _consumerClient; |
|
|
|
private SubscriptionClient? _consumerClient; |
|
|
|
|
|
|
|
public AzureServiceBusConsumerClient( |
|
|
|
ILogger logger, |
|
|
@@ -36,11 +36,11 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
_asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); |
|
|
|
} |
|
|
|
|
|
|
|
public event EventHandler<TransportMessage> OnMessageReceived; |
|
|
|
public event EventHandler<TransportMessage>? OnMessageReceived; |
|
|
|
|
|
|
|
public event EventHandler<LogMessageEventArgs> OnLog; |
|
|
|
public event EventHandler<LogMessageEventArgs>? OnLog; |
|
|
|
|
|
|
|
public BrokerAddress BrokerAddress => new BrokerAddress("AzureServiceBus", _asbOptions.ConnectionString); |
|
|
|
public BrokerAddress BrokerAddress => new ("AzureServiceBus", _asbOptions.ConnectionString); |
|
|
|
|
|
|
|
public void Subscribe(IEnumerable<string> topics) |
|
|
|
{ |
|
|
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
|
|
|
|
ConnectAsync().GetAwaiter().GetResult(); |
|
|
|
|
|
|
|
var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name); |
|
|
|
var allRuleNames = _consumerClient!.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name); |
|
|
|
|
|
|
|
foreach (var newRule in topics.Except(allRuleNames)) |
|
|
|
{ |
|
|
@@ -80,7 +80,7 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
|
|
|
|
if (_asbOptions.EnableSessions) |
|
|
|
{ |
|
|
|
_consumerClient.RegisterSessionHandler(OnConsumerReceivedWithSession, |
|
|
|
_consumerClient!.RegisterSessionHandler(OnConsumerReceivedWithSession, |
|
|
|
new SessionHandlerOptions(OnExceptionReceived) |
|
|
|
{ |
|
|
|
AutoComplete = false, |
|
|
@@ -89,7 +89,7 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
_consumerClient.RegisterMessageHandler(OnConsumerReceived, |
|
|
|
_consumerClient!.RegisterMessageHandler(OnConsumerReceived, |
|
|
|
new MessageHandlerOptions(OnExceptionReceived) |
|
|
|
{ |
|
|
|
AutoComplete = false, |
|
|
@@ -111,15 +111,15 @@ namespace DotNetCore.CAP.AzureServiceBus |
|
|
|
var commitInput = (AzureServiceBusConsumerCommitInput) sender; |
|
|
|
if (_asbOptions.EnableSessions) |
|
|
|
{ |
|
|
|
commitInput.Session.CompleteAsync(commitInput.LockToken); |
|
|
|
commitInput.Session?.CompleteAsync(commitInput.LockToken); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
_consumerClient.CompleteAsync(commitInput.LockToken); |
|
|
|
_consumerClient!.CompleteAsync(commitInput.LockToken); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void Reject(object sender) |
|
|
|
public void Reject(object? sender) |
|
|
|
{ |
|
|
|
// ignore |
|
|
|
} |
|
|
|