diff --git a/CAP.sln b/CAP.sln
index 12bcbda..b5d3f5c 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -71,9 +71,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Redis", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.Redis.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.RedisStreams.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj
index 3bfb741..0b0b723 100644
--- a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj
+++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj
@@ -13,7 +13,7 @@
-
+
diff --git a/samples/Samples.Redis.SqlServer/Startup.cs b/samples/Samples.Redis.SqlServer/Startup.cs
index cd445bc..c5ef3a1 100644
--- a/samples/Samples.Redis.SqlServer/Startup.cs
+++ b/samples/Samples.Redis.SqlServer/Startup.cs
@@ -1,4 +1,4 @@
-using DotNetCore.CAP.Redis;
+using DotNetCore.CAP.RedisStreams;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs
index 6593a76..c0fa094 100644
--- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs
+++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs
@@ -1,14 +1,15 @@
-using DotNetCore.CAP;
+using System;
+using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reflection;
+using DotNetCore.CAP.RedisStreams;
-namespace DotNetCore.CAP.Redis
+namespace Microsoft.Extensions.DependencyInjection
{
public static class CapRedisOptionsExtensions
{
@@ -21,8 +22,7 @@ namespace DotNetCore.CAP.Redis
public static CapOptions UseRedis(this CapOptions options, Action configure)
{
- if (configure is null)
- throw new ArgumentNullException(nameof(configure));
+ if (configure is null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new RedisOptionsExtension(configure));
diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs
index 7ab86bd..5372416 100644
--- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs
+++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs
@@ -1,15 +1,15 @@
-using DotNetCore.CAP;
+using System;
+using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class CapRedisOptionsPostConfigure : IPostConfigureOptions
{
diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs
index bf03d49..82eec0d 100644
--- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs
@@ -1,12 +1,12 @@
-using StackExchange.Redis;
-using System;
+using System;
+using StackExchange.Redis;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP
{
public class CapRedisOptions
{
@@ -17,7 +17,14 @@ namespace DotNetCore.CAP.Redis
internal string Endpoint { get; set; }
+ ///
+ /// Gets or sets the count of entries consumed from stream
+ ///
public uint StreamEntriesCount { get; set; }
+
+ ///
+ /// Gets or sets the number of connections that can be used with redis server
+ ///
public uint ConnectionPoolSize { get; set; }
}
}
diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj
similarity index 67%
rename from src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj
rename to src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj
index 51e56cf..c4c9032 100644
--- a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj
+++ b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj
@@ -2,19 +2,15 @@
netstandard2.1
- DotNetCore.CAP.Redis
- $(PackageTags);Redis
+ DotNetCore.CAP.RedisStreams
+ $(PackageTags);RedisStreams
- bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Redis.xml
+ bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.RedisStreams.xml
1701;1702;1705;CS1591
-
-
-
-
diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs
index 6cb5404..fd1b816 100644
--- a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs
@@ -1,4 +1,5 @@
-using DotNetCore.CAP.Redis;
+using System;
+using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Configuration;
@@ -6,13 +7,12 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP
{
class RedisOptionsExtension : ICapOptionsExtension
{
diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs
index 0bc96a8..21f7394 100644
--- a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs
+++ b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs
@@ -1,7 +1,7 @@
-using Microsoft.Extensions.Logging;
+using System;
+using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@@ -9,7 +9,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisConnectionPool : IRedisConnectionPool,IDisposable
{
diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs
index 4ffb5db..149c498 100644
--- a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs
+++ b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs
@@ -1,7 +1,7 @@
-using Microsoft.Extensions.Logging;
+using System;
+using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@@ -10,7 +10,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
public class AsyncLazyRedisConnection : Lazy>
{
diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.cs
index c619f0d..c1d644b 100644
--- a/src/DotNetCore.CAP.Redis/IConnectionPool.cs
+++ b/src/DotNetCore.CAP.Redis/IConnectionPool.cs
@@ -1,10 +1,10 @@
-using StackExchange.Redis;
-using System;
+using System;
+using StackExchange.Redis;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
interface IRedisConnectionPool
{
diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs
index 6ccc728..71ecd43 100644
--- a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs
@@ -1,9 +1,9 @@
-using DotNetCore.CAP.Redis;
+using System;
+using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@@ -14,7 +14,7 @@ using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisConsumerClient : IConsumerClient
{
diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs
index ee0580b..cb9b553 100644
--- a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs
@@ -1,16 +1,16 @@
-using DotNetCore.CAP.Redis;
+using System;
+using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisConsumerClientFactory : IConsumerClientFactory
{
diff --git a/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs b/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs
deleted file mode 100644
index f33bd05..0000000
--- a/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright (c) .NET Core Community. All rights reserved.
-// Licensed under the MIT License. See License.txt in the project root for license information.
-
-using System;
-using System.Linq;
-using DotNetCore.CAP.Internal;
-using DotNetCore.CAP.Redis;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-
-namespace DotNetCore.CAP.Kafka
-{
- class PollPendingTopic : IProcessingServer
- {
- private readonly IRedisStreamManager redis;
- private readonly ILogger logger;
- private readonly CapRedisOptions options;
- private readonly MethodMatcherCache selector;
-
- public PollPendingTopic(
- IRedisStreamManager redis,
- ILogger logger,
- IOptions options,
- MethodMatcherCache selector)
- {
- this.redis = redis;
- this.logger = logger;
- this.options = options.Value;
- this.selector = selector;
- }
-
- public void Start()
- {
- try
- {
- var streams = selector.GetAllTopics();
-
- foreach (var stream in streams)
- {
- var streamExist=redis.
- }
-
- topics.Value.First().TopicName
- using var adminClient = new AdminClientBuilder(config).Build();
-
- adminClient.CreateTopicsAsync(topics.Select(x => new TopicSpecification
- {
- Name = x
- })).GetAwaiter().GetResult();
-
- logger.LogInformation("Topic is automatically created successfully!");
- }
- catch (CreateTopicsException ex) when (ex.Message.Contains("already exists"))
- {
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "An error was encountered when automatically creating topic!");
- }
- finally
- {
- KafkaConsumerClientFactory.WaitCreateTopic.Set();
- }
- }
-
- public void Dispose()
- {
-
- }
- }
-}
diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs
index e5bbe53..6c9aa08 100644
--- a/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs
+++ b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs
@@ -1,12 +1,12 @@
-using Microsoft.Extensions.Logging;
-using System;
+using System;
+using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisLogger : TextWriter
{
diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.cs
index 7a0abfe..6b5229e 100644
--- a/src/DotNetCore.CAP.Redis/IRedis.Events.cs
+++ b/src/DotNetCore.CAP.Redis/IRedis.Events.cs
@@ -1,13 +1,13 @@
-using Microsoft.Extensions.Logging;
+using System;
+using Microsoft.Extensions.Logging;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisEvents
{
diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs
index 03408d2..dba44cd 100644
--- a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs
+++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs
@@ -1,8 +1,8 @@
-using DotNetCore.CAP.Transport;
+using System;
+using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
@@ -10,7 +10,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisStreamManager : IRedisStreamManager
{
@@ -92,17 +92,28 @@ namespace DotNetCore.CAP.Redis
{
try
{
+ token.ThrowIfCancellationRequested();
+
+ var createdPositions = new List();
+
await ConnectAsync();
var database = redis.GetDatabase();
- var readSet = database.StreamReadGroupAsync(positions, consumerGroup, consumerGroup, (int)options.StreamEntriesCount);
+ await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, logger))
+ {
+ createdPositions.Add(position);
+ }
+
+ if (!createdPositions.Any()) return (false, Array.Empty());
+
+ var readSet = database.StreamReadGroupAsync(createdPositions.ToArray(), consumerGroup, consumerGroup, (int)options.StreamEntriesCount);
return (true, await readSet.ConfigureAwait(false));
}
catch (Exception ex)
{
- logger.LogError($"Redis error when trying read consumer group {consumerGroup}", ex);
+ logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}");
return (false, Array.Empty());
}
}
diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs
new file mode 100644
index 0000000..2a54a5d
--- /dev/null
+++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs
@@ -0,0 +1,54 @@
+using Microsoft.Extensions.Logging;
+using StackExchange.Redis;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis
+{
+ static class RedisStreamManagerExtensions
+ {
+ public static async IAsyncEnumerable TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger = null)
+ {
+ foreach (var position in positions)
+ {
+ bool created = false;
+ try
+ {
+ var stream = position.Key;
+ var streamExist = await database.KeyTypeAsync(stream);
+ if (streamExist == RedisType.None)
+ {
+ if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages, true))
+ {
+ logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]");
+ created = true;
+ }
+ }
+ else
+ {
+ var groupInfo = await database.StreamGroupInfoAsync(stream);
+
+ if (groupInfo.All(g => g.Name != consumerGroup))
+ {
+ if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages))
+ {
+ logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]");
+ created = true;
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ logger?.LogError(ex, $"Redis error while creating consumer group [{consumerGroup}] of stream [{position.Key}]");
+ }
+
+ if (created)
+ yield return position;
+ }
+ }
+ }
+}
diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs
index b742edf..03c2a31 100644
--- a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs
+++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs
@@ -1,8 +1,8 @@
-using DotNetCore.CAP.Transport;
+using System;
+using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
@@ -10,7 +10,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
interface IRedisStreamManager
{
diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs
index cea32bf..a894397 100644
--- a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs
@@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
class RedisTransport : ITransport
{
diff --git a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs
deleted file mode 100644
index 349abf7..0000000
--- a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-using DotNetCore.CAP.Internal;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace DotNetCore.CAP.Redis
-{
- static class MethodMatcherCacheExtensions
- {
- public static string GetGroupByTopic(this MethodMatcherCache source, string topicName)
- {
- var groupsMap = source.GetCandidatesMethodsOfGroupNameGrouped();
-
- return (from groupMap in groupsMap
- from topic in groupMap.Value
- where topic.TopicName == topicName
- select topic.Attribute.Group).FirstOrDefault();
- }
- }
-}
diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs
index f12ff58..33a8b0c 100644
--- a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs
+++ b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs
@@ -1,6 +1,6 @@
-using DotNetCore.CAP.Messages;
+using System;
+using DotNetCore.CAP.Messages;
using StackExchange.Redis;
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
@@ -8,7 +8,7 @@ using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
-namespace DotNetCore.CAP.Redis
+namespace DotNetCore.CAP.RedisStreams
{
static class RedisMessage
{