Browse Source

Added CustomHeaders support to Azure Service Bus transport (#1063)

* Updated docs

* Added CustomHeaders property to CAP.AzureServiceBusOptions.cs

* Merged upstream/master

* Updated docs

* Verified if a custom header with the same key is already present
master
Mateus Viegas 3 years ago
committed by GitHub
parent
commit
7c50fd4f69
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 145 additions and 13 deletions
  1. +7
    -0
      CAP.sln
  2. +2
    -2
      docs/content/user-guide/en/cap/messaging.md
  3. +26
    -7
      docs/content/user-guide/en/transport/azure-service-bus.md
  4. +31
    -0
      samples/Sample.AzureServiceBus.InMemory/Program.cs
  5. +17
    -0
      samples/Sample.AzureServiceBus.InMemory/Sample.AzureServiceBus.InMemory.csproj
  6. +14
    -0
      samples/Sample.AzureServiceBus.InMemory/SampleSubscriber.cs
  7. +8
    -0
      samples/Sample.AzureServiceBus.InMemory/appsettings.Development.json
  8. +9
    -0
      samples/Sample.AzureServiceBus.InMemory/appsettings.json
  9. +22
    -3
      src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
  10. +9
    -1
      src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs

+ 7
- 0
CAP.sln View File

@@ -86,6 +86,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.D
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{0C734FB2-7D75-4FF3-B564-1E50E6280B14}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -208,6 +210,10 @@ Global
{83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU
{0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -242,6 +248,7 @@ Global
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{0C734FB2-7D75-4FF3-B564-1E50E6280B14} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 2
- 2
docs/content/user-guide/en/cap/messaging.md View File

@@ -73,7 +73,7 @@ cap-msg-type | string | The type of message, `typeof(T).FullName`(not required)
cap-senttime | string | sending time (not required)

### Custom headers
To consume messages sent without CAP headers, both Kafka and RabbitMQ consumers can inject a minimal set of headers using custom headers as shown below:
To consume messages sent without CAP headers, both AzureServiceBus, Kafka and RabbitMQ consumers can inject a minimal set of headers using the `CustomHeaders` property as shown below (RabbitMQ example):
```C#
container.AddCap(x =>
{
@@ -89,7 +89,7 @@ container.AddCap(x =>
});
```

After adding `cap-msg-id` and `cap-msg-name`, CAP consumers receive messages sent directly from the RabbitMQ management tool.
After adding `cap-msg-id` and `cap-msg-name`, CAP consumers receive messages sent directly from any external system, like the RabbitMQ management tool when using RabbitMQ as a transport.

## Scheduling



+ 26
- 7
docs/content/user-guide/en/transport/azure-service-bus.md View File

@@ -38,13 +38,13 @@ public void ConfigureServices(IServiceCollection services)

The AzureServiceBus configuration options provided directly by the CAP:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
ConnectionString | Endpoint address | string |
EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false
TopicPath | Topic entity path | string | cap
ManagementTokenProvider | Token provider | ITokenProvider | null
| NAME | DESCRIPTION | TYPE | DEFAULT |
|:-------------------------|:-------------------------------------------------------------------------------------------------------------|---|:--- |
| ConnectionString | Endpoint address | string |
| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false |
| TopicPath | Topic entity path | string | cap |
| ManagementTokenProvider | Token provider | ITokenProvider | null |
| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | Func<Message, List<KeyValuePair<string, string>>>? | null |
#### Sessions

When sessions are enabled (see `EnableSessions` option above), every message sent will have a session id. To control the session id, include
@@ -62,3 +62,22 @@ capBus.Publish(yourEventName, yourEvent, extraHeaders);
```

If no session id header is present, the message id will be used as the session id.

#### Heterogeneous Systems

Sometimes you might want to listen to a message that was published by an external system. In this case, you need to add a set of two mandatory headers for CAP compatibility as shown below.

```csharp
c.UseAzureServiceBus(asb =>
{
asb.ConnectionString = ...
asb.CustomHeaders = message => new List<KeyValuePair<string, string>>()
{
new(DotNetCore.CAP.Messages.Headers.MessageId,
SnowflakeId.Default().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label)
};
});
```

> Important: If a header with the same name (key) already exists in the message, the Custom Header won't be added.

+ 31
- 0
samples/Sample.AzureServiceBus.InMemory/Program.cs View File

@@ -0,0 +1,31 @@
using DotNetCore.CAP.Internal;
using Sample.AzureServiceBus.InMemory;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddLogging(l => l.AddConsole());

builder.Services.AddCap(c =>
{
c.UseInMemoryStorage();
c.UseAzureServiceBus(asb =>
{
asb.ConnectionString = builder.Configuration.GetConnectionString("AzureServiceBus");
asb.CustomHeaders = message => new List<KeyValuePair<string, string>>()
{
new(DotNetCore.CAP.Messages.Headers.MessageId,
SnowflakeId.Default().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label)
};
});

c.UseDashboard();
});

builder.Services.AddSingleton<SampleSubscriber>();

var app = builder.Build();

app.MapGet("/", () => "Hello World!");

app.Run();

+ 17
- 0
samples/Sample.AzureServiceBus.InMemory/Sample.AzureServiceBus.InMemory.csproj View File

@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<UserSecretsId>1c4ab524-d04d-459c-bf1d-9cb5da3ecaf1</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 14
- 0
samples/Sample.AzureServiceBus.InMemory/SampleSubscriber.cs View File

@@ -0,0 +1,14 @@
using DotNetCore.CAP;

namespace Sample.AzureServiceBus.InMemory;

public class SampleSubscriber : ICapSubscribe
{
public record Message(string Content);
[CapSubscribe("cap.sample.tests")]
public void Handle(Message message)
{
Console.WriteLine($"Message {message.Content} received");
}
}

+ 8
- 0
samples/Sample.AzureServiceBus.InMemory/appsettings.Development.json View File

@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}

+ 9
- 0
samples/Sample.AzureServiceBus.InMemory/appsettings.json View File

@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}

+ 22
- 3
src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs View File

@@ -185,10 +185,29 @@ namespace DotNetCore.CAP.AzureServiceBus

private TransportMessage ConvertMessage(Message message)
{
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString());
header.Add(Headers.Group, _subscriptionName);
var headers = message.UserProperties
.ToDictionary(x => x.Key, y => y.Value?.ToString());
headers.Add(Headers.Group, _subscriptionName);

var customHeaders = _asbOptions.CustomHeaders?.Invoke(message);
if (customHeaders?.Any() == true)
{
foreach (var customHeader in customHeaders)
{
var added = headers.TryAdd(customHeader.Key, customHeader.Value);

if (!added)
{
_logger.LogWarning(
"Not possible to add the custom header {Header}. A value with the same key already exists in the Message headers.",
customHeader.Key);
}
}
}

return new TransportMessage(header, message.Body);
return new TransportMessage(headers, message.Body);
}
private Task OnConsumerReceivedWithSession(IMessageSession session, Message message, CancellationToken token)


+ 9
- 1
src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs View File

@@ -1,7 +1,10 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using DotNetCore.CAP.AzureServiceBus;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Primitives;

// ReSharper disable once CheckNamespace
@@ -36,6 +39,11 @@ namespace DotNetCore.CAP
/// <summary>
/// Represents the Azure Active Directory token provider for Azure Managed Service Identity integration.
/// </summary>
public ITokenProvider? ManagementTokenProvider { get; set; }
public ITokenProvider? ManagementTokenProvider { get; set; }

/// <summary>
/// Use this function to write additional headers from the original ASB Message or any Custom Header, i.e. to allow compatibility with heterogeneous systems, into <see cref="CapHeader"/>
/// </summary>
public Func<Message, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }
}
}

Loading…
Cancel
Save