Browse Source

Add support for custom message headers for RabbitMQ consumer side. (#818)

master
Savorboard 3 years ago
parent
commit
bdfd1016e6
2 changed files with 22 additions and 2 deletions
  1. +7
    -0
      src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
  2. +15
    -2
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

+ 7
- 0
src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs View File

@@ -4,7 +4,9 @@
// ReSharper disable once CheckNamespace

using System;
using System.Collections.Generic;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -74,6 +76,11 @@ namespace DotNetCore.CAP
/// </summary>
public QueueArgumentsOptions QueueArguments { get; set; } = new QueueArgumentsOptions();

/// <summary>
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader"/>.
/// </summary>
public Func<BasicDeliverEventArgs, List<KeyValuePair<string, string>>> CustomHeaders { get; set; }

/// <summary>
/// RabbitMQ native connection factory options
/// </summary>


+ 15
- 2
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs View File

@@ -168,12 +168,25 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
var headers = new Dictionary<string, string>();
foreach (var header in e.BasicProperties.Headers)
if (e.BasicProperties.Headers != null)
{
headers.Add(header.Key, header.Value == null ? null : Encoding.UTF8.GetString((byte[])header.Value));
foreach (var header in e.BasicProperties.Headers)
{
headers.Add(header.Key, header.Value == null ? null : Encoding.UTF8.GetString((byte[])header.Value));
}
}

headers.Add(Headers.Group, _queueName);

if (_rabbitMQOptions.CustomHeaders != null)
{
var customHeaders = _rabbitMQOptions.CustomHeaders(e);
foreach (var customHeader in customHeaders)
{
headers[customHeader.Key] = customHeader.Value;
}
}

var message = new TransportMessage(headers, e.Body.ToArray());

OnMessageReceived?.Invoke(e.DeliveryTag, message);


Loading…
Cancel
Save