@@ -15,12 +15,22 @@ namespace Microsoft.Extensions.DependencyInjection | |||
return options.UseRedis(_ => { }); | |||
} | |||
/// <summary> | |||
/// Use redis streams as the message transport. | |||
/// </summary> | |||
/// <param name="options">The <see cref="CapOptions"/>.</param> | |||
/// <param name="connection">The StackExchange.Redis <see cref="ConfigurationOptions"/> comma-delimited configuration string.</param> | |||
public static CapOptions UseRedis(this CapOptions options, string connection) | |||
{ | |||
return options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection)); | |||
} | |||
/// <summary> | |||
/// Use redis streams as the message transport. | |||
/// </summary> | |||
/// <param name="options">The <see cref="CapOptions"/>.</param> | |||
/// <param name="configure">The CAP redis client options.</param> | |||
/// <exception cref="ArgumentNullException"><paramref name="configure"/> is <c>null</c>.</exception> | |||
public static CapOptions UseRedis(this CapOptions options, Action<CapRedisOptions> configure) | |||
{ | |||
if (configure is null) throw new ArgumentNullException(nameof(configure)); | |||
@@ -9,11 +9,11 @@ namespace DotNetCore.CAP | |||
public class CapRedisOptions | |||
{ | |||
/// <summary> | |||
/// Gets or sets the options of redis connections | |||
/// Gets or sets the native options of StackExchange.Redis | |||
/// </summary> | |||
public ConfigurationOptions Configuration { get; set; } | |||
public ConfigurationOptions? Configuration { get; set; } | |||
internal string Endpoint { get; set; } | |||
internal string Endpoint { get; set; } = default!; | |||
/// <summary> | |||
/// Gets or sets the count of entries consumed from stream | |||
@@ -2,7 +2,7 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.RedisStreams</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);RedisStreams</PackageTags> | |||
</PropertyGroup> | |||
@@ -16,7 +16,7 @@ | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -14,11 +14,10 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
internal class RedisConnectionPool : IRedisConnectionPool, IDisposable | |||
{ | |||
private readonly ConcurrentBag<AsyncLazyRedisConnection> _connections = | |||
new ConcurrentBag<AsyncLazyRedisConnection>(); | |||
private readonly ConcurrentBag<AsyncLazyRedisConnection> _connections = new(); | |||
private readonly ILoggerFactory _loggerFactory; | |||
private readonly SemaphoreSlim _poolLock = new SemaphoreSlim(1); | |||
private readonly SemaphoreSlim _poolLock = new(1); | |||
private readonly CapRedisOptions _redisOptions; | |||
private bool _isDisposed; | |||
private bool _poolAlreadyConfigured; | |||
@@ -30,13 +29,11 @@ namespace DotNetCore.CAP.RedisStreams | |||
Init().GetAwaiter().GetResult(); | |||
} | |||
private AsyncLazyRedisConnection QuietConnection | |||
private AsyncLazyRedisConnection? QuietConnection | |||
{ | |||
get | |||
{ | |||
if (_poolAlreadyConfigured) | |||
return _connections.OrderBy(async c => (await c).ConnectionCapacity).First(); | |||
return null; | |||
return _poolAlreadyConfigured ? _connections.OrderBy(async c => (await c).ConnectionCapacity).First() : null; | |||
} | |||
} | |||
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
var redisLogger = new RedisLogger(logger); | |||
ConnectionMultiplexer connection = null; | |||
ConnectionMultiplexer? connection = null; | |||
while (attemp <= 5) | |||
{ | |||
@@ -18,9 +18,9 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
private readonly string _groupId; | |||
private readonly ILogger<RedisConsumerClient> _logger; | |||
private readonly IOptions<CapRedisOptions> _options; | |||
private readonly IOptions<CapRedisOptions> _options; | |||
private readonly IRedisStreamManager _redis; | |||
private string[] _topics; | |||
private string[] _topics = default!; | |||
public RedisConsumerClient(string groupId, | |||
IRedisStreamManager redis, | |||
@@ -34,11 +34,11 @@ namespace DotNetCore.CAP.RedisStreams | |||
_logger = logger; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("redis", _options.Value.Endpoint); | |||
public BrokerAddress BrokerAddress => new("redis", _options.Value.Endpoint); | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
@@ -59,16 +59,17 @@ namespace DotNetCore.CAP.RedisStreams | |||
cancellationToken.ThrowIfCancellationRequested(); | |||
cancellationToken.WaitHandle.WaitOne(timeout); | |||
} | |||
// ReSharper disable once FunctionNeverReturns | |||
} | |||
public void Commit(object sender) | |||
{ | |||
var (stream, group, id) = ((string stream, string group, string id)) sender; | |||
var (stream, group, id) = ((string stream, string group, string id))sender; | |||
_redis.Ack(stream, group, id).GetAwaiter().GetResult(); | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
// ignore | |||
} | |||
@@ -94,32 +95,39 @@ namespace DotNetCore.CAP.RedisStreams | |||
private async Task ConsumeMessages(IAsyncEnumerable<IEnumerable<RedisStream>> streamsSet, RedisValue position) | |||
{ | |||
await foreach (var set in streamsSet) | |||
foreach (var stream in set) | |||
foreach (var entry in stream.Entries) | |||
{ | |||
if (entry.IsNull) | |||
return; | |||
try | |||
foreach (var stream in set) | |||
{ | |||
var message = RedisMessage.Create(entry, _groupId); | |||
OnMessageReceived?.Invoke((stream.Key.ToString(), _groupId, entry.Id.ToString()), message); | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError(ex.Message, ex); | |||
var logArgs = new LogMessageEventArgs | |||
foreach (var entry in stream.Entries) | |||
{ | |||
LogType = MqLogType.ConsumeError, | |||
Reason = ex.ToString() | |||
}; | |||
OnLog?.Invoke(entry, logArgs); | |||
} | |||
finally | |||
{ | |||
var positionName = position == StreamPosition.Beginning | |||
? nameof(StreamPosition.Beginning) | |||
: nameof(StreamPosition.NewMessages); | |||
_logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); | |||
if (entry.IsNull) | |||
{ | |||
return; | |||
} | |||
try | |||
{ | |||
var message = RedisMessage.Create(entry, _groupId); | |||
OnMessageReceived?.Invoke((stream.Key.ToString(), _groupId, entry.Id.ToString()), message); | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError(ex.Message, ex); | |||
var logArgs = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.ConsumeError, | |||
Reason = ex.ToString() | |||
}; | |||
OnLog?.Invoke(entry, logArgs); | |||
} | |||
finally | |||
{ | |||
var positionName = position == StreamPosition.Beginning | |||
? nameof(StreamPosition.Beginning) | |||
: nameof(StreamPosition.NewMessages); | |||
_logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
private readonly IRedisConnectionPool _connectionsPool; | |||
private readonly ILogger<RedisStreamManager> _logger; | |||
private readonly CapRedisOptions _options; | |||
private IConnectionMultiplexer _redis; | |||
private IConnectionMultiplexer? _redis; | |||
public RedisStreamManager(IRedisConnectionPool connectionsPool, IOptions<CapRedisOptions> options, | |||
ILogger<RedisStreamManager> logger) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored | |||
var database = _redis.GetDatabase(); | |||
var database = _redis!.GetDatabase(); | |||
var streamExist = await database.KeyExistsAsync(stream); | |||
if (!streamExist) | |||
{ | |||
@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored | |||
await _redis.GetDatabase().StreamAddAsync(stream, message); | |||
await _redis!.GetDatabase().StreamAddAsync(stream, message); | |||
} | |||
public async IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsLatestMessagesAsync(string[] streams, | |||
@@ -98,7 +98,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
await ConnectAsync(); | |||
await _redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); | |||
await _redis!.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); | |||
} | |||
private async Task<IEnumerable<RedisStream>> TryReadConsumerGroup(string consumerGroup, | |||
@@ -112,7 +112,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
var database = _redis.GetDatabase(); | |||
var database = _redis!.GetDatabase(); | |||
await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, _logger) | |||
.WithCancellation(token)) | |||
@@ -11,7 +11,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
internal static class RedisStreamManagerExtensions | |||
{ | |||
public static async IAsyncEnumerable<StreamPosition> TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger = null) | |||
public static async IAsyncEnumerable<StreamPosition> TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger) | |||
{ | |||
foreach (var position in positions) | |||
{ | |||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, | |||
StreamPosition.NewMessages)) | |||
{ | |||
logger.LogInformation( | |||
logger!.LogInformation( | |||
$"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); | |||
created = true; | |||
} | |||
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, | |||
StreamPosition.NewMessages)) | |||
{ | |||
logger.LogInformation( | |||
logger!.LogInformation( | |||
$"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); | |||
created = true; | |||
} | |||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
_logger = logger; | |||
} | |||
public BrokerAddress BrokerAddress => new BrokerAddress("redis", _options.Endpoint); | |||
public BrokerAddress BrokerAddress => new ("redis", _options.Endpoint); | |||
public async Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
@@ -23,16 +23,15 @@ namespace DotNetCore.CAP.RedisStreams | |||
}; | |||
} | |||
public static TransportMessage Create(StreamEntry streamEntry, string groupId = null) | |||
public static TransportMessage Create(StreamEntry streamEntry, string? groupId = null) | |||
{ | |||
if (streamEntry.IsNull) | |||
return null; | |||
var headersRaw = streamEntry[Headers]; | |||
if (headersRaw.IsNullOrEmpty) | |||
{ | |||
throw new ArgumentException($"Redis stream entry with id {streamEntry.Id} missing cap headers"); | |||
var headers = JsonSerializer.Deserialize<IDictionary<string, string>>(headersRaw); | |||
} | |||
var headers = JsonSerializer.Deserialize<IDictionary<string, string?>>(headersRaw)!; | |||
var bodyRaw = streamEntry[Body]; | |||
@@ -43,8 +42,12 @@ namespace DotNetCore.CAP.RedisStreams | |||
return new TransportMessage(headers, body); | |||
} | |||
private static string ToJson(object obj) | |||
private static RedisValue ToJson(object? obj) | |||
{ | |||
if (obj == null) | |||
{ | |||
return RedisValue.Null; | |||
} | |||
return JsonSerializer.Serialize(obj, new JsonSerializerOptions(JsonSerializerDefaults.Web)); | |||
} | |||
} |