Sfoglia il codice sorgente

Enable #nullable for RabbitMq transport.

master
Savorboard 3 anni fa
parent
commit
a471feaaa4
5 ha cambiato i file con 20 aggiunte e 25 eliminazioni
  1. +5
    -4
      src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
  2. +1
    -6
      src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
  3. +1
    -1
      src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
  4. +3
    -3
      src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs
  5. +10
    -11
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

+ 5
- 4
src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs Vedi File

@@ -74,28 +74,29 @@ namespace DotNetCore.CAP
/// Optional queue arguments, also known as "x-arguments" because of their field name in the AMQP 0-9-1 protocol,
/// is a map (dictionary) of arbitrary key/value pairs that can be provided by clients when a queue is declared.
/// </summary>
public QueueArgumentsOptions QueueArguments { get; set; } = new QueueArgumentsOptions();
public QueueArgumentsOptions QueueArguments { get; set; } = new ();

/// <summary>
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader"/>.
/// </summary>
public Func<BasicDeliverEventArgs, List<KeyValuePair<string, string>>> CustomHeaders { get; set; }
public Func<BasicDeliverEventArgs, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }

/// <summary>
/// RabbitMQ native connection factory options
/// </summary>
public Action<ConnectionFactory> ConnectionFactoryOptions { get; set; }
public Action<ConnectionFactory>? ConnectionFactoryOptions { get; set; }

public class QueueArgumentsOptions
{
/// <summary>
/// Gets or sets queue mode by supplying the 'x-queue-mode' declaration argument with a string specifying the desired mode.
/// </summary>
public string QueueMode { get; set; }
public string QueueMode { get; set; } = default!;

/// <summary>
/// Gets or sets queue message automatic deletion time (in milliseconds) "x-message-ttl", Default 864000000 ms (10 days).
/// </summary>
// ReSharper disable once InconsistentNaming
public int MessageTTL { get; set; } = 864000000;
}
}

+ 1
- 6
src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj Vedi File

@@ -2,14 +2,9 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName>
<Nullable>enable</Nullable>
<PackageTags>$(PackageTags);RabbitMQ</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.RabbitMQ.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />


+ 1
- 1
src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs Vedi File

@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly Func<IConnection> _connectionActivator;
private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool;
private IConnection _connection;
private IConnection? _connection;
private static readonly object SLock = new object();

private int _count;


+ 3
- 3
src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs Vedi File

@@ -27,11 +27,11 @@ namespace DotNetCore.CAP.RabbitMQ
_exchange = _connectionChannelPool.Exchange;
}

public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _connectionChannelPool.HostAddress);
public BrokerAddress BrokerAddress => new ("RabbitMQ", _connectionChannelPool.HostAddress);

public Task<OperateResult> SendAsync(TransportMessage message)
{
IModel channel = null;
IModel? channel = null;
try
{
channel = _connectionChannelPool.Rent();
@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.RabbitMQ

var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value);
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object?)x.Value);

channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);



+ 10
- 11
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs Vedi File

@@ -22,9 +22,8 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _exchangeName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IModel _channel;

private IConnection _connection;
private IModel? _channel;
private IConnection? _connection;

public RabbitMQConsumerClient(string queueName,
IConnectionChannelPool connectionChannelPool,
@@ -36,11 +35,11 @@ namespace DotNetCore.CAP.RabbitMQ
_exchangeName = connectionChannelPool.Exchange;
}

public event EventHandler<TransportMessage> OnMessageReceived;
public event EventHandler<TransportMessage>? OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;
public event EventHandler<LogMessageEventArgs>? OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _rabbitMQOptions.HostName);
public BrokerAddress BrokerAddress => new("RabbitMQ", _rabbitMQOptions.HostName);

public void Subscribe(IEnumerable<string> topics)
{
@@ -81,17 +80,17 @@ namespace DotNetCore.CAP.RabbitMQ

public void Commit(object sender)
{
if (_channel.IsOpen)
if (_channel!.IsOpen)
{
_channel.BasicAck((ulong)sender, false);
}
}

public void Reject(object sender)
public void Reject(object? sender)
{
if (_channel.IsOpen)
if (_channel!.IsOpen && sender is ulong val)
{
_channel.BasicReject((ulong)sender, true);
_channel.BasicReject(val, true);
}
}

@@ -175,7 +174,7 @@ namespace DotNetCore.CAP.RabbitMQ

private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
var headers = new Dictionary<string, string>();
var headers = new Dictionary<string, string?>();

if (e.BasicProperties.Headers != null)
{


Caricamento…
Annulla
Salva