Переглянути джерело

Add support for Azure Service Bus sessions (#829)

master
Jon Ekdahl 3 роки тому
committed by GitHub
джерело
коміт
458b08db00
Не вдалося знайти GPG ключ що відповідає даному підпису Ідентифікатор GPG ключа: 4AEE18F83AFDEB23
6 змінених файлів з 108 додано та 13 видалено
  1. +20
    -1
      docs/content/user-guide/en/transport/azure-service-bus.md
  2. +52
    -12
      src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
  3. +16
    -0
      src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs
  4. +7
    -0
      src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs
  5. +7
    -0
      src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs
  6. +6
    -0
      src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs

+ 20
- 1
docs/content/user-guide/en/transport/azure-service-bus.md Переглянути файл

@@ -41,5 +41,24 @@ The AzureServiceBus configuration options provided directly by the CAP:
NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
ConnectionString | Endpoint address | string |
EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false
TopicPath | Topic entity path | string | cap
ManagementTokenProvider | Token provider | ITokenProvider | null
ManagementTokenProvider | Token provider | ITokenProvider | null

#### Sessions

When sessions are enabled (see `EnableSessions` option above), every message sent will have a session id. To control the session id, include
an extra header with name `AzureServiceBusHeaders.SessionId` when publishing events:

```csharp
ICapPublisher capBus = ...;
string yourEventName = ...;
YourEventType yourEvent = ...;

Dictionary<string, string> extraHeaders = new Dictionary<string, string>();
extraHeaders.Add(AzureServiceBusHeaders.SessionId, <your-session-id>);

capBus.Publish(yourEventName, yourEvent, extraHeaders);
```

If no session id header is present, the message id will be used as the session id.

+ 52
- 12
src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs Переглянути файл

@@ -78,13 +78,25 @@ namespace DotNetCore.CAP.AzureServiceBus
{
ConnectAsync().GetAwaiter().GetResult();

_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
AutoComplete = false,
MaxConcurrentCalls = 10,
MaxAutoRenewDuration = TimeSpan.FromSeconds(30)
});
if (_asbOptions.EnableSessions)
{
_consumerClient.RegisterSessionHandler(OnConsumerReceivedWithSession,
new SessionHandlerOptions(OnExceptionReceived)
{
AutoComplete = false,
MaxAutoRenewDuration = TimeSpan.FromSeconds(30)
});
}
else
{
_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
AutoComplete = false,
MaxConcurrentCalls = 10,
MaxAutoRenewDuration = TimeSpan.FromSeconds(30)
});
}

while (true)
{
@@ -96,7 +108,15 @@ namespace DotNetCore.CAP.AzureServiceBus

public void Commit(object sender)
{
_consumerClient.CompleteAsync((string)sender);
var commitInput = (AzureServiceBusConsumerCommitInput) sender;
if (_asbOptions.EnableSessions)
{
commitInput.Session.CompleteAsync(commitInput.LockToken);
}
else
{
_consumerClient.CompleteAsync(commitInput.LockToken);
}
}

public void Reject(object sender)
@@ -141,7 +161,13 @@ namespace DotNetCore.CAP.AzureServiceBus

if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
{
await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
var subscriptionDescription =
new SubscriptionDescription(_asbOptions.TopicPath, _subscriptionName)
{
RequiresSession = _asbOptions.EnableSessions
};

await mClient.CreateSubscriptionAsync(subscriptionDescription);
_logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
}

@@ -157,14 +183,28 @@ namespace DotNetCore.CAP.AzureServiceBus

#region private methods

private Task OnConsumerReceived(Message message, CancellationToken token)
private TransportMessage ConvertMessage(Message message)
{
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString());
header.Add(Headers.Group, _subscriptionName);

var context = new TransportMessage(header, message.Body);
return new TransportMessage(header, message.Body);
}
private Task OnConsumerReceivedWithSession(IMessageSession session, Message message, CancellationToken token)
{
var context = ConvertMessage(message);

OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken, session), context);
return Task.CompletedTask;
}

private Task OnConsumerReceived(Message message, CancellationToken token)
{
var context = ConvertMessage(message);

OnMessageReceived?.Invoke(message.SystemProperties.LockToken, context);
OnMessageReceived?.Invoke(new AzureServiceBusConsumerCommitInput(message.SystemProperties.LockToken), context);

return Task.CompletedTask;
}


+ 16
- 0
src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs Переглянути файл

@@ -0,0 +1,16 @@
using Microsoft.Azure.ServiceBus;

namespace DotNetCore.CAP.AzureServiceBus
{
public class AzureServiceBusConsumerCommitInput
{
public AzureServiceBusConsumerCommitInput(string lockToken, IMessageSession session = null)
{
LockToken = lockToken;
Session = session;
}
public IMessageSession Session { get; set; }
public string LockToken { get; set; }
}
}

+ 7
- 0
src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusHeaders.cs Переглянути файл

@@ -0,0 +1,7 @@
namespace DotNetCore.CAP.AzureServiceBus
{
public static class AzureServiceBusHeaders
{
public const string SessionId = "cap-session-id";
}
}

+ 7
- 0
src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs Переглянути файл

@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.AzureServiceBus;
using Microsoft.Azure.ServiceBus.Primitives;

// ReSharper disable once CheckNamespace
@@ -21,6 +22,12 @@ namespace DotNetCore.CAP
/// </summary>
public string ConnectionString { get; set; }

/// <summary>
/// Whether Service Bus sessions are enabled. If enabled, all messages must contain a
/// <see cref="AzureServiceBusHeaders.SessionId"/> header. Defaults to false.
/// </summary>
public bool EnableSessions { get; set; } = false;

/// <summary>
/// The name of the topic relative to the service namespace base address.
/// </summary>


+ 6
- 0
src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs Переглянути файл

@@ -46,6 +46,12 @@ namespace DotNetCore.CAP.AzureServiceBus
CorrelationId = transportMessage.GetCorrelationId()
};

if (_asbOptions.Value.EnableSessions)
{
transportMessage.Headers.TryGetValue(AzureServiceBusHeaders.SessionId, out var sessionId);
message.SessionId = string.IsNullOrEmpty(sessionId) ? transportMessage.GetId() : sessionId;
}

foreach (var header in transportMessage.Headers)
{
message.UserProperties.Add(header.Key, header.Value);


Завантаження…
Відмінити
Зберегти