|
|
@@ -1,8 +1,8 @@ |
|
|
|
using DotNetCore.CAP.Transport; |
|
|
|
using System; |
|
|
|
using DotNetCore.CAP.Transport; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using Microsoft.Extensions.Options; |
|
|
|
using StackExchange.Redis; |
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Runtime.CompilerServices; |
|
|
@@ -10,7 +10,7 @@ using System.Text; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
|
|
|
|
namespace DotNetCore.CAP.Redis |
|
|
|
namespace DotNetCore.CAP.RedisStreams |
|
|
|
{ |
|
|
|
class RedisStreamManager : IRedisStreamManager |
|
|
|
{ |
|
|
@@ -92,17 +92,28 @@ namespace DotNetCore.CAP.Redis |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
token.ThrowIfCancellationRequested(); |
|
|
|
|
|
|
|
var createdPositions = new List<StreamPosition>(); |
|
|
|
|
|
|
|
await ConnectAsync(); |
|
|
|
|
|
|
|
var database = redis.GetDatabase(); |
|
|
|
|
|
|
|
var readSet = database.StreamReadGroupAsync(positions, consumerGroup, consumerGroup, (int)options.StreamEntriesCount); |
|
|
|
await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, logger)) |
|
|
|
{ |
|
|
|
createdPositions.Add(position); |
|
|
|
} |
|
|
|
|
|
|
|
if (!createdPositions.Any()) return (false, Array.Empty<RedisStream>()); |
|
|
|
|
|
|
|
var readSet = database.StreamReadGroupAsync(createdPositions.ToArray(), consumerGroup, consumerGroup, (int)options.StreamEntriesCount); |
|
|
|
|
|
|
|
return (true, await readSet.ConfigureAwait(false)); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
logger.LogError($"Redis error when trying read consumer group {consumerGroup}", ex); |
|
|
|
logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}"); |
|
|
|
return (false, Array.Empty<RedisStream>()); |
|
|
|
} |
|
|
|
} |
|
|
|