Browse Source

Apply redis keys hashslots for cluster support (#949)

* apply redis keys hashslots for cluster support

* rename var
master
Mahmoud Samir 3 years ago
committed by GitHub
parent
commit
8289e5066a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 24 deletions
  1. +25
    -0
      .dockerignore
  2. +13
    -5
      samples/Samples.Redis.SqlServer/Controllers/HomeController.cs
  3. +25
    -0
      samples/Samples.Redis.SqlServer/Dockerfile
  4. +2
    -1
      samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj
  5. +12
    -2
      samples/Samples.Redis.SqlServer/Startup.cs
  6. +90
    -0
      samples/Samples.Redis.SqlServer/docker-compose.yml
  7. +20
    -2
      src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs
  8. +1
    -1
      src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs
  9. +15
    -11
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs
  10. +2
    -2
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs

+ 25
- 0
.dockerignore View File

@@ -0,0 +1,25 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md

+ 13
- 5
samples/Samples.Redis.SqlServer/Controllers/HomeController.cs View File

@@ -1,6 +1,8 @@
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading.Tasks; using System.Threading.Tasks;


namespace Samples.Redis.SqlServer.Controllers namespace Samples.Redis.SqlServer.Controllers
@@ -11,25 +13,31 @@ namespace Samples.Redis.SqlServer.Controllers
{ {
private readonly ILogger<HomeController> _logger; private readonly ILogger<HomeController> _logger;
private readonly ICapPublisher _publisher; private readonly ICapPublisher _publisher;
private readonly IOptions<CapOptions> _options;


public HomeController(ILogger<HomeController> logger, ICapPublisher publisher)
public HomeController(ILogger<HomeController> logger, ICapPublisher publisher, IOptions<CapOptions> options)
{ {
_logger = logger; _logger = logger;
_publisher = publisher; _publisher = publisher;
this._options = options;
} }


[HttpGet] [HttpGet]
public async Task Publish()
public async Task Publish([FromQuery] string message = "test-message")
{ {
await _publisher.PublishAsync("test-message", new Person() { Age = 11, Name = "James" });
await _publisher.PublishAsync(message, new Person() { Age = 11, Name = "James" });
} }


[CapSubscribe("test-message")] [CapSubscribe("test-message")]
[CapSubscribe("test-message-1")]
[CapSubscribe("test-message-2")]
[CapSubscribe("test-message-3")]
[NonAction] [NonAction]
public void Subscribe(Person p)
public void Subscribe(Person p, [FromCap] CapHeader header)
{ {
_logger.LogInformation($"test-message subscribed with value --> " + p);
_logger.LogInformation($"{header[Headers.MessageName]} subscribed with value --> " + p);
} }

} }


public class Person public class Person


+ 25
- 0
samples/Samples.Redis.SqlServer/Dockerfile View File

@@ -0,0 +1,25 @@
#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging.

FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build
WORKDIR /src
COPY ["samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj", "samples/Samples.Redis.SqlServer/"]
COPY ["src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj", "src/DotNetCore.CAP.RedisStreams/"]
COPY ["src/DotNetCore.CAP/DotNetCore.CAP.csproj", "src/DotNetCore.CAP/"]
COPY ["src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj", "src/DotNetCore.CAP.SqlServer/"]
RUN dotnet restore "samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj"
COPY . .
WORKDIR "/src/samples/Samples.Redis.SqlServer"
RUN dotnet build "Samples.Redis.SqlServer.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "Samples.Redis.SqlServer.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Samples.Redis.SqlServer.dll"]

+ 2
- 1
samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj View File

@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">


<PropertyGroup> <PropertyGroup>
<TargetFramework>net5.0</TargetFramework> <TargetFramework>net5.0</TargetFramework>
@@ -8,6 +8,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RedisStreams\DotNetCore.CAP.RedisStreams.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.RedisStreams\DotNetCore.CAP.RedisStreams.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" />
</ItemGroup> </ItemGroup>


</Project> </Project>

+ 12
- 2
samples/Samples.Redis.SqlServer/Startup.cs View File

@@ -1,6 +1,7 @@
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.OpenApi.Models;


namespace Samples.Redis.SqlServer namespace Samples.Redis.SqlServer
{ {
@@ -17,16 +18,25 @@ namespace Samples.Redis.SqlServer
{ {
services.AddControllers(); services.AddControllers();


services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Samples.Redis.SqlServer", Version = "v1" });
});


services.AddCap(options => services.AddCap(options =>
{ {
options.UseRedis("");
options.UseRedis("redis-node-0:6379,password=cap");


options.UseSqlServer("");
options.UseSqlServer("Server=db;Database=master;User=sa;Password=P@ssw0rd;");
}); });
} }


public void Configure(IApplicationBuilder app) public void Configure(IApplicationBuilder app)
{ {
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Samples.Redis.SqlServer v1"));

app.UseRouting(); app.UseRouting();
app.UseEndpoints(endpoints => app.UseEndpoints(endpoints =>
{ {


+ 90
- 0
samples/Samples.Redis.SqlServer/docker-compose.yml View File

@@ -0,0 +1,90 @@
version: '2'
services:
redis-node-0:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-0:/bitnami/redis/data
environment:
- 'REDIS_PASSWORD=cap'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'

redis-node-1:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-1:/bitnami/redis/data
environment:
- 'REDIS_PASSWORD=cap'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'

redis-node-2:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-2:/bitnami/redis/data
environment:
- 'REDIS_PASSWORD=cap'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'

redis-node-3:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-3:/bitnami/redis/data
environment:
- 'REDIS_PASSWORD=cap'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'

redis-node-4:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-4:/bitnami/redis/data
environment:
- 'REDIS_PASSWORD=cap'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'

redis-node-5:
image: docker.io/bitnami/redis-cluster:6.2
volumes:
- redis-cluster_data-5:/bitnami/redis/data
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
environment:
- 'REDIS_PASSWORD=cap'
- 'REDISCLI_AUTH=cap'
- 'REDIS_CLUSTER_REPLICAS=1'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
- 'REDIS_CLUSTER_CREATOR=yes'

db:
image: "mcr.microsoft.com/mssql/server"
ports:
- 1433:1433
environment:
SA_PASSWORD: "P@ssw0rd"
ACCEPT_EULA: "Y"

redis-sample:
build:
context: ../..
dockerfile: samples/Samples.Redis.SqlServer/Dockerfile
ports:
- 5000:80
depends_on:
- db
- redis-node-5
volumes:
redis-cluster_data-0:
driver: local
redis-cluster_data-1:
driver: local
redis-cluster_data-2:
driver: local
redis-cluster_data-3:
driver: local
redis-cluster_data-4:
driver: local
redis-cluster_data-5:
driver: local

+ 20
- 2
src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs View File

@@ -24,12 +24,30 @@ namespace DotNetCore.CAP.RedisStreams
private static async Task<RedisConnection> ConnectAsync(CapRedisOptions redisOptions, private static async Task<RedisConnection> ConnectAsync(CapRedisOptions redisOptions,
ILogger<AsyncLazyRedisConnection> logger) ILogger<AsyncLazyRedisConnection> logger)
{ {
int attemp = 1;

var redisLogger = new RedisLogger(logger); var redisLogger = new RedisLogger(logger);


var connection = await ConnectionMultiplexer.ConnectAsync(redisOptions.Configuration, redisLogger)
ConnectionMultiplexer connection = null;

while (attemp <= 5)
{
connection = await ConnectionMultiplexer.ConnectAsync(redisOptions.Configuration, redisLogger)
.ConfigureAwait(false); .ConfigureAwait(false);


connection.LogEvents(logger);
connection.LogEvents(logger);

if (!connection.IsConnected)
{
logger.LogWarning($"Can't establish redis connection,trying to establish connection [attemp {attemp}].");
await Task.Delay(TimeSpan.FromSeconds(2));
++attemp;
}
else
attemp = 6;
}
if (connection == null)
throw new Exception($"Can't establish redis connection,after [{attemp}] attemps.");


return new RedisConnection(connection); return new RedisConnection(connection);
} }


+ 1
- 1
src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs View File

@@ -91,7 +91,7 @@ namespace DotNetCore.CAP.RedisStreams
_ = ConsumeMessages(newMsgs, StreamPosition.NewMessages); _ = ConsumeMessages(newMsgs, StreamPosition.NewMessages);
} }


private async Task ConsumeMessages(IAsyncEnumerable<RedisStream[]> streamsSet, RedisValue position)
private async Task ConsumeMessages(IAsyncEnumerable<IEnumerable<RedisStream>> streamsSet, RedisValue position)
{ {
await foreach (var set in streamsSet) await foreach (var set in streamsSet)
foreach (var stream in set) foreach (var stream in set)


+ 15
- 11
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs View File

@@ -56,7 +56,7 @@ namespace DotNetCore.CAP.RedisStreams
await _redis.GetDatabase().StreamAddAsync(stream, message); await _redis.GetDatabase().StreamAddAsync(stream, message);
} }


public async IAsyncEnumerable<RedisStream[]> PollStreamsLatestMessagesAsync(string[] streams,
public async IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsLatestMessagesAsync(string[] streams,
string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token) string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token)
{ {
var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.NewMessages)); var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.NewMessages));
@@ -66,13 +66,13 @@ namespace DotNetCore.CAP.RedisStreams
var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token) var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token)
.ConfigureAwait(false); .ConfigureAwait(false);


yield return result.streams;
yield return result;


token.WaitHandle.WaitOne(pollDelay); token.WaitHandle.WaitOne(pollDelay);
} }
} }


public async IAsyncEnumerable<RedisStream[]> PollStreamsPendingMessagesAsync(string[] streams,
public async IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsPendingMessagesAsync(string[] streams,
string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token) string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token)
{ {
var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.Beginning)); var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.Beginning));
@@ -84,10 +84,10 @@ namespace DotNetCore.CAP.RedisStreams
var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token) var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token)
.ConfigureAwait(false); .ConfigureAwait(false);


yield return result.streams;
yield return result;


//Once we consumed our history of pending messages, we can break the loop. //Once we consumed our history of pending messages, we can break the loop.
if (result.canRead && result.streams.All(s => s.Entries.Length < _options.StreamEntriesCount))
if (result.All(s => s.Entries.Length < _options.StreamEntriesCount))
break; break;


token.WaitHandle.WaitOne(pollDelay); token.WaitHandle.WaitOne(pollDelay);
@@ -101,7 +101,7 @@ namespace DotNetCore.CAP.RedisStreams
await _redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); await _redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false);
} }


private async Task<(bool canRead, RedisStream[] streams)> TryReadConsumerGroup(string consumerGroup,
private async Task<IEnumerable<RedisStream>> TryReadConsumerGroup(string consumerGroup,
StreamPosition[] positions, CancellationToken token) StreamPosition[] positions, CancellationToken token)
{ {
try try
@@ -118,12 +118,16 @@ namespace DotNetCore.CAP.RedisStreams
.WithCancellation(token)) .WithCancellation(token))
createdPositions.Add(position); createdPositions.Add(position);


if (!createdPositions.Any()) return (false, Array.Empty<RedisStream>());
if (!createdPositions.Any()) return Array.Empty<RedisStream>();


var readSet = database.StreamReadGroupAsync(createdPositions.ToArray(), consumerGroup, consumerGroup,
(int) _options.StreamEntriesCount);
//calculate keys HashSlots to start reading per HashSlot
var groupedPositions = createdPositions.GroupBy(s => _redis.GetHashSlot(s.Key))
.Select(group => database.StreamReadGroupAsync(group.ToArray(), consumerGroup, consumerGroup, (int)_options.StreamEntriesCount));


return (true, await readSet.ConfigureAwait(false));
var readSet = await Task.WhenAll(groupedPositions)
.ConfigureAwait(false);

return readSet.SelectMany(set => set);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@@ -134,7 +138,7 @@ namespace DotNetCore.CAP.RedisStreams
_logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}"); _logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}");
} }


return (false, Array.Empty<RedisStream>());
return Array.Empty<RedisStream>();
} }


private async Task ConnectAsync() private async Task ConnectAsync()


+ 2
- 2
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs View File

@@ -14,10 +14,10 @@ namespace DotNetCore.CAP.RedisStreams
Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup); Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup);
Task PublishAsync(string stream, NameValueEntry[] message); Task PublishAsync(string stream, NameValueEntry[] message);


IAsyncEnumerable<RedisStream[]> PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup,
IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup,
TimeSpan pollDelay, CancellationToken token); TimeSpan pollDelay, CancellationToken token);


IAsyncEnumerable<RedisStream[]> PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup,
IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup,
TimeSpan pollDelay, CancellationToken token); TimeSpan pollDelay, CancellationToken token);


Task Ack(string stream, string consumerGroup, string messageId); Task Ack(string stream, string consumerGroup, string messageId);


Loading…
Cancel
Save