diff --git a/docs/content/user-guide/en/transport/azure-service-bus.md b/docs/content/user-guide/en/transport/azure-service-bus.md index 173e24d..9c1e7b3 100644 --- a/docs/content/user-guide/en/transport/azure-service-bus.md +++ b/docs/content/user-guide/en/transport/azure-service-bus.md @@ -41,5 +41,24 @@ The AzureServiceBus configuration options provided directly by the CAP: NAME | DESCRIPTION | TYPE | DEFAULT :---|:---|---|:--- ConnectionString | Endpoint address | string | +EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false TopicPath | Topic entity path | string | cap -ManagementTokenProvider | Token provider | ITokenProvider | null \ No newline at end of file +ManagementTokenProvider | Token provider | ITokenProvider | null + +#### Sessions + +When sessions are enabled (see `EnableSessions` option above), every message sent will have a session id. To control the session id, include +an extra header with name `AzureServiceBusHeaders.SessionId` when publishing events: + +```csharp +ICapPublisher capBus = ...; +string yourEventName = ...; +YourEventType yourEvent = ...; + +Dictionary extraHeaders = new Dictionary(); +extraHeaders.Add(AzureServiceBusHeaders.SessionId, ); + +capBus.Publish(yourEventName, yourEvent, extraHeaders); +``` + +If no session id header is present, the message id will be used as the session id. diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index de94ad9..ee5d84b 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -78,13 +78,25 @@ namespace DotNetCore.CAP.AzureServiceBus { ConnectAsync().GetAwaiter().GetResult(); - _consumerClient.RegisterMessageHandler(OnConsumerReceived, - new MessageHandlerOptions(OnExceptionReceived) - { - AutoComplete = false, - MaxConcurrentCalls = 10, - MaxAutoRenewDuration = TimeSpan.FromSeconds(30) - }); + if (_asbOptions.EnableSessions) + { + _consumerClient.RegisterSessionHandler(OnConsumerReceivedWithSession, + new SessionHandlerOptions(OnExceptionReceived) + { + AutoComplete = false, + MaxAutoRenewDuration = TimeSpan.FromSeconds(30) + }); + } + else + { + _consumerClient.RegisterMessageHandler(OnConsumerReceived, + new MessageHandlerOptions(OnExceptionReceived) + { + AutoComplete = false, + MaxConcurrentCalls = 10, + MaxAutoRenewDuration = TimeSpan.FromSeconds(30) + }); + } while (true) { @@ -96,7 +108,15 @@ namespace DotNetCore.CAP.AzureServiceBus public void Commit(object sender) { - _consumerClient.CompleteAsync((string)sender); + var commitInput = (AzureServiceBusConsumerCommitInput) sender; + if (_asbOptions.EnableSessions) + { + commitInput.Session.CompleteAsync(commitInput.LockToken); + } + else + { + _consumerClient.CompleteAsync(commitInput.LockToken); + } } public void Reject(object sender) @@ -141,7 +161,13 @@ namespace DotNetCore.CAP.AzureServiceBus if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName)) { - await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName); + var subscriptionDescription = + new SubscriptionDescription(_asbOptions.TopicPath, _subscriptionName) + { + RequiresSession = _asbOptions.EnableSessions + }; + + await mClient.CreateSubscriptionAsync(subscriptionDescription); _logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}"); } @@ -157,14 +183,28 @@ namespace DotNetCore.CAP.AzureServiceBus #region private methods - private Task OnConsumerReceived(Message message, CancellationToken token) + private TransportMessage ConvertMessage(Message message) { var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString()); header.Add(Headers.Group, _subscriptionName); - var context = new TransportMessage(header, message.Body); + return new TransportMessage(header, message.Body); + } + + private Task OnConsumerReceivedWithSession(IMessageSession session, Message message, CancellationToken token) + { + var context = ConvertMessage(message); + + OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken, session), context); + + return Task.CompletedTask; + } + + private Task OnConsumerReceived(Message message, CancellationToken token) + { + var context = ConvertMessage(message); - OnMessageReceived?.Invoke(message.SystemProperties.LockToken, context); + OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken), context); return Task.CompletedTask; } diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs new file mode 100644 index 0000000..c76a360 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs @@ -0,0 +1,16 @@ +using Microsoft.Azure.ServiceBus; + +namespace DotNetCore.CAP.AzureServiceBus +{ + public class AzureServiceBusConsumerCommitInput + { + public AzureServiceBusConsumerCommitInput(string lockToken, IMessageSession session = null) + { + LockToken = lockToken; + Session = session; + } + + public IMessageSession Session { get; set; } + public string LockToken { get; set; } + } +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs new file mode 100644 index 0000000..89723a0 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs @@ -0,0 +1,7 @@ +namespace DotNetCore.CAP.AzureServiceBus +{ + public static class AzureServiceBusHeaders + { + public const string SessionId = "cap-session-id"; + } +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs index ef78199..61c6d63 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using DotNetCore.CAP.AzureServiceBus; using Microsoft.Azure.ServiceBus.Primitives; // ReSharper disable once CheckNamespace @@ -21,6 +22,12 @@ namespace DotNetCore.CAP /// public string ConnectionString { get; set; } + /// + /// Whether Service Bus sessions are enabled. If enabled, all messages must contain a + /// header. Defaults to false. + /// + public bool EnableSessions { get; set; } = false; + /// /// The name of the topic relative to the service namespace base address. /// diff --git a/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs b/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs index 264f5c7..3038c5d 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs @@ -46,6 +46,12 @@ namespace DotNetCore.CAP.AzureServiceBus CorrelationId = transportMessage.GetCorrelationId() }; + if (_asbOptions.Value.EnableSessions) + { + transportMessage.Headers.TryGetValue(AzureServiceBusHeaders.SessionId, out var sessionId); + message.SessionId = string.IsNullOrEmpty(sessionId) ? transportMessage.GetId() : sessionId; + } + foreach (var header in transportMessage.Headers) { message.UserProperties.Add(header.Key, header.Value);