diff --git a/CAP.sln b/CAP.sln index db7190d..e7107e6 100644 --- a/CAP.sln +++ b/CAP.sln @@ -86,6 +86,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.D EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{0C734FB2-7D75-4FF3-B564-1E50E6280B14}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -208,6 +210,10 @@ Global {83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU + {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -242,6 +248,7 @@ Global {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF} {DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} {83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {0C734FB2-7D75-4FF3-B564-1E50E6280B14} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index dee83c6..06840ef 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -73,7 +73,7 @@ cap-msg-type | string | The type of message, `typeof(T).FullName`(not required) cap-senttime | string | sending time (not required) ### Custom headers -To consume messages sent without CAP headers, both Kafka and RabbitMQ consumers can inject a minimal set of headers using custom headers as shown below: +To consume messages sent without CAP headers, both AzureServiceBus, Kafka and RabbitMQ consumers can inject a minimal set of headers using the `CustomHeaders` property as shown below (RabbitMQ example): ```C# container.AddCap(x => { @@ -89,7 +89,7 @@ container.AddCap(x => }); ``` -After adding `cap-msg-id` and `cap-msg-name`, CAP consumers receive messages sent directly from the RabbitMQ management tool. +After adding `cap-msg-id` and `cap-msg-name`, CAP consumers receive messages sent directly from any external system, like the RabbitMQ management tool when using RabbitMQ as a transport. ## Scheduling diff --git a/docs/content/user-guide/en/transport/azure-service-bus.md b/docs/content/user-guide/en/transport/azure-service-bus.md index 9c1e7b3..10d9714 100644 --- a/docs/content/user-guide/en/transport/azure-service-bus.md +++ b/docs/content/user-guide/en/transport/azure-service-bus.md @@ -38,13 +38,13 @@ public void ConfigureServices(IServiceCollection services) The AzureServiceBus configuration options provided directly by the CAP: -NAME | DESCRIPTION | TYPE | DEFAULT -:---|:---|---|:--- -ConnectionString | Endpoint address | string | -EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false -TopicPath | Topic entity path | string | cap -ManagementTokenProvider | Token provider | ITokenProvider | null - +| NAME | DESCRIPTION | TYPE | DEFAULT | +|:-------------------------|:-------------------------------------------------------------------------------------------------------------|---|:--- | +| ConnectionString | Endpoint address | string | +| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false | +| TopicPath | Topic entity path | string | cap | +| ManagementTokenProvider | Token provider | ITokenProvider | null | +| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | Func>>? | null | #### Sessions When sessions are enabled (see `EnableSessions` option above), every message sent will have a session id. To control the session id, include @@ -62,3 +62,22 @@ capBus.Publish(yourEventName, yourEvent, extraHeaders); ``` If no session id header is present, the message id will be used as the session id. + +#### Heterogeneous Systems + +Sometimes you might want to listen to a message that was published by an external system. In this case, you need to add a set of two mandatory headers for CAP compatibility as shown below. + +```csharp + c.UseAzureServiceBus(asb => + { + asb.ConnectionString = ... + asb.CustomHeaders = message => new List>() + { + new(DotNetCore.CAP.Messages.Headers.MessageId, + SnowflakeId.Default().NextId().ToString()), + new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label) + }; + }); +``` + +> Important: If a header with the same name (key) already exists in the message, the Custom Header won't be added. diff --git a/samples/Sample.AzureServiceBus.InMemory/Program.cs b/samples/Sample.AzureServiceBus.InMemory/Program.cs new file mode 100644 index 0000000..957c9d8 --- /dev/null +++ b/samples/Sample.AzureServiceBus.InMemory/Program.cs @@ -0,0 +1,31 @@ +using DotNetCore.CAP.Internal; +using Sample.AzureServiceBus.InMemory; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services.AddLogging(l => l.AddConsole()); + +builder.Services.AddCap(c => +{ + c.UseInMemoryStorage(); + c.UseAzureServiceBus(asb => + { + asb.ConnectionString = builder.Configuration.GetConnectionString("AzureServiceBus"); + asb.CustomHeaders = message => new List>() + { + new(DotNetCore.CAP.Messages.Headers.MessageId, + SnowflakeId.Default().NextId().ToString()), + new(DotNetCore.CAP.Messages.Headers.MessageName, message.Label) + }; + }); + + c.UseDashboard(); +}); + +builder.Services.AddSingleton(); + +var app = builder.Build(); + +app.MapGet("/", () => "Hello World!"); + +app.Run(); \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.InMemory/Sample.AzureServiceBus.InMemory.csproj b/samples/Sample.AzureServiceBus.InMemory/Sample.AzureServiceBus.InMemory.csproj new file mode 100644 index 0000000..457dde6 --- /dev/null +++ b/samples/Sample.AzureServiceBus.InMemory/Sample.AzureServiceBus.InMemory.csproj @@ -0,0 +1,17 @@ + + + + net6.0 + enable + enable + 1c4ab524-d04d-459c-bf1d-9cb5da3ecaf1 + + + + + + + + + + diff --git a/samples/Sample.AzureServiceBus.InMemory/SampleSubscriber.cs b/samples/Sample.AzureServiceBus.InMemory/SampleSubscriber.cs new file mode 100644 index 0000000..f2df265 --- /dev/null +++ b/samples/Sample.AzureServiceBus.InMemory/SampleSubscriber.cs @@ -0,0 +1,14 @@ +using DotNetCore.CAP; + +namespace Sample.AzureServiceBus.InMemory; + +public class SampleSubscriber : ICapSubscribe +{ + public record Message(string Content); + + [CapSubscribe("cap.sample.tests")] + public void Handle(Message message) + { + Console.WriteLine($"Message {message.Content} received"); + } +} \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.InMemory/appsettings.Development.json b/samples/Sample.AzureServiceBus.InMemory/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/samples/Sample.AzureServiceBus.InMemory/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/Sample.AzureServiceBus.InMemory/appsettings.json b/samples/Sample.AzureServiceBus.InMemory/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/samples/Sample.AzureServiceBus.InMemory/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index cfd1c3d..a2e5725 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -185,10 +185,29 @@ namespace DotNetCore.CAP.AzureServiceBus private TransportMessage ConvertMessage(Message message) { - var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString()); - header.Add(Headers.Group, _subscriptionName); + var headers = message.UserProperties + .ToDictionary(x => x.Key, y => y.Value?.ToString()); + + headers.Add(Headers.Group, _subscriptionName); + + var customHeaders = _asbOptions.CustomHeaders?.Invoke(message); + + if (customHeaders?.Any() == true) + { + foreach (var customHeader in customHeaders) + { + var added = headers.TryAdd(customHeader.Key, customHeader.Value); + + if (!added) + { + _logger.LogWarning( + "Not possible to add the custom header {Header}. A value with the same key already exists in the Message headers.", + customHeader.Key); + } + } + } - return new TransportMessage(header, message.Body); + return new TransportMessage(headers, message.Body); } private Task OnConsumerReceivedWithSession(IMessageSession session, Message message, CancellationToken token) diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs index 050bb99..c1237a4 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -1,7 +1,10 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Collections.Generic; using DotNetCore.CAP.AzureServiceBus; +using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Primitives; // ReSharper disable once CheckNamespace @@ -36,6 +39,11 @@ namespace DotNetCore.CAP /// /// Represents the Azure Active Directory token provider for Azure Managed Service Identity integration. /// - public ITokenProvider? ManagementTokenProvider { get; set; } + public ITokenProvider? ManagementTokenProvider { get; set; } + + /// + /// Use this function to write additional headers from the original ASB Message or any Custom Header, i.e. to allow compatibility with heterogeneous systems, into + /// + public Func>>? CustomHeaders { get; set; } } } \ No newline at end of file