From 458b08db00ff4d4be862ebf226a2c6303b3e72fb Mon Sep 17 00:00:00 2001 From: Jon Ekdahl Date: Thu, 15 Apr 2021 03:25:43 +0200 Subject: [PATCH] Add support for Azure Service Bus sessions (#829) --- .../en/transport/azure-service-bus.md | 21 +++++- .../AzureServiceBusConsumerClient.cs | 64 +++++++++++++++---- .../AzureServiceBusConsumerCommitInput.cs | 16 +++++ .../AzureServiceBusHeaders.cs | 7 ++ .../CAP.AzureServiceBusOptions.cs | 7 ++ .../ITransport.AzureServiceBus.cs | 6 ++ 6 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs diff --git a/docs/content/user-guide/en/transport/azure-service-bus.md b/docs/content/user-guide/en/transport/azure-service-bus.md index 173e24d..9c1e7b3 100644 --- a/docs/content/user-guide/en/transport/azure-service-bus.md +++ b/docs/content/user-guide/en/transport/azure-service-bus.md @@ -41,5 +41,24 @@ The AzureServiceBus configuration options provided directly by the CAP: NAME | DESCRIPTION | TYPE | DEFAULT :---|:---|---|:--- ConnectionString | Endpoint address | string | +EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false TopicPath | Topic entity path | string | cap -ManagementTokenProvider | Token provider | ITokenProvider | null \ No newline at end of file +ManagementTokenProvider | Token provider | ITokenProvider | null + +#### Sessions + +When sessions are enabled (see `EnableSessions` option above), every message sent will have a session id. To control the session id, include +an extra header with name `AzureServiceBusHeaders.SessionId` when publishing events: + +```csharp +ICapPublisher capBus = ...; +string yourEventName = ...; +YourEventType yourEvent = ...; + +Dictionary extraHeaders = new Dictionary(); +extraHeaders.Add(AzureServiceBusHeaders.SessionId, ); + +capBus.Publish(yourEventName, yourEvent, extraHeaders); +``` + +If no session id header is present, the message id will be used as the session id. diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index de94ad9..ee5d84b 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -78,13 +78,25 @@ namespace DotNetCore.CAP.AzureServiceBus { ConnectAsync().GetAwaiter().GetResult(); - _consumerClient.RegisterMessageHandler(OnConsumerReceived, - new MessageHandlerOptions(OnExceptionReceived) - { - AutoComplete = false, - MaxConcurrentCalls = 10, - MaxAutoRenewDuration = TimeSpan.FromSeconds(30) - }); + if (_asbOptions.EnableSessions) + { + _consumerClient.RegisterSessionHandler(OnConsumerReceivedWithSession, + new SessionHandlerOptions(OnExceptionReceived) + { + AutoComplete = false, + MaxAutoRenewDuration = TimeSpan.FromSeconds(30) + }); + } + else + { + _consumerClient.RegisterMessageHandler(OnConsumerReceived, + new MessageHandlerOptions(OnExceptionReceived) + { + AutoComplete = false, + MaxConcurrentCalls = 10, + MaxAutoRenewDuration = TimeSpan.FromSeconds(30) + }); + } while (true) { @@ -96,7 +108,15 @@ namespace DotNetCore.CAP.AzureServiceBus public void Commit(object sender) { - _consumerClient.CompleteAsync((string)sender); + var commitInput = (AzureServiceBusConsumerCommitInput) sender; + if (_asbOptions.EnableSessions) + { + commitInput.Session.CompleteAsync(commitInput.LockToken); + } + else + { + _consumerClient.CompleteAsync(commitInput.LockToken); + } } public void Reject(object sender) @@ -141,7 +161,13 @@ namespace DotNetCore.CAP.AzureServiceBus if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName)) { - await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName); + var subscriptionDescription = + new SubscriptionDescription(_asbOptions.TopicPath, _subscriptionName) + { + RequiresSession = _asbOptions.EnableSessions + }; + + await mClient.CreateSubscriptionAsync(subscriptionDescription); _logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}"); } @@ -157,14 +183,28 @@ namespace DotNetCore.CAP.AzureServiceBus #region private methods - private Task OnConsumerReceived(Message message, CancellationToken token) + private TransportMessage ConvertMessage(Message message) { var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString()); header.Add(Headers.Group, _subscriptionName); - var context = new TransportMessage(header, message.Body); + return new TransportMessage(header, message.Body); + } + + private Task OnConsumerReceivedWithSession(IMessageSession session, Message message, CancellationToken token) + { + var context = ConvertMessage(message); + + OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken, session), context); + + return Task.CompletedTask; + } + + private Task OnConsumerReceived(Message message, CancellationToken token) + { + var context = ConvertMessage(message); - OnMessageReceived?.Invoke(message.SystemProperties.LockToken, context); + OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken), context); return Task.CompletedTask; } diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs new file mode 100644 index 0000000..c76a360 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs @@ -0,0 +1,16 @@ +using Microsoft.Azure.ServiceBus; + +namespace DotNetCore.CAP.AzureServiceBus +{ + public class AzureServiceBusConsumerCommitInput + { + public AzureServiceBusConsumerCommitInput(string lockToken, IMessageSession session = null) + { + LockToken = lockToken; + Session = session; + } + + public IMessageSession Session { get; set; } + public string LockToken { get; set; } + } +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs new file mode 100644 index 0000000..89723a0 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs @@ -0,0 +1,7 @@ +namespace DotNetCore.CAP.AzureServiceBus +{ + public static class AzureServiceBusHeaders + { + public const string SessionId = "cap-session-id"; + } +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs index ef78199..61c6d63 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using DotNetCore.CAP.AzureServiceBus; using Microsoft.Azure.ServiceBus.Primitives; // ReSharper disable once CheckNamespace @@ -21,6 +22,12 @@ namespace DotNetCore.CAP /// public string ConnectionString { get; set; } + /// + /// Whether Service Bus sessions are enabled. If enabled, all messages must contain a + /// header. Defaults to false. + /// + public bool EnableSessions { get; set; } = false; + /// /// The name of the topic relative to the service namespace base address. /// diff --git a/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs b/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs index 264f5c7..3038c5d 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs @@ -46,6 +46,12 @@ namespace DotNetCore.CAP.AzureServiceBus CorrelationId = transportMessage.GetCorrelationId() }; + if (_asbOptions.Value.EnableSessions) + { + transportMessage.Headers.TryGetValue(AzureServiceBusHeaders.SessionId, out var sessionId); + message.SessionId = string.IsNullOrEmpty(sessionId) ? transportMessage.GetId() : sessionId; + } + foreach (var header in transportMessage.Headers) { message.UserProperties.Add(header.Key, header.Value);