Преглед изворни кода

Merge pull request #817 from MahmoudSamir101/master

Adding Redis to Cap
master
xiangxiren пре 3 година
committed by GitHub
родитељ
комит
ef1d1fa795
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
25 измењених фајлова са 1132 додато и 1 уклоњено
  1. +15
    -1
      CAP.sln
  2. +14
    -0
      samples/Samples.Redis.SqlServer/AppDbContext.cs
  3. +39
    -0
      samples/Samples.Redis.SqlServer/Controllers/HomeController.cs
  4. +26
    -0
      samples/Samples.Redis.SqlServer/Program.cs
  5. +24
    -0
      samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj
  6. +76
    -0
      samples/Samples.Redis.SqlServer/Startup.cs
  7. +9
    -0
      samples/Samples.Redis.SqlServer/appsettings.Development.json
  8. +10
    -0
      samples/Samples.Redis.SqlServer/appsettings.json
  9. +32
    -0
      src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs
  10. +37
    -0
      src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.PostConfigure.cs
  11. +30
    -0
      src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs
  12. +22
    -0
      src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj
  13. +41
    -0
      src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs
  14. +110
    -0
      src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs
  15. +67
    -0
      src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs
  16. +13
    -0
      src/DotNetCore.CAP.RedisStreams/IConnectionPool.cs
  17. +135
    -0
      src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs
  18. +33
    -0
      src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs
  19. +27
    -0
      src/DotNetCore.CAP.RedisStreams/IRedis.Events.Logger.cs
  20. +57
    -0
      src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs
  21. +133
    -0
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs
  22. +56
    -0
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs
  23. +23
    -0
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs
  24. +51
    -0
      src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs
  25. +52
    -0
      src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs

+ 15
- 1
CAP.sln Прегледај датотеку

@@ -69,7 +69,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams", "src\DotNetCore.CAP.RedisStreams\DotNetCore.CAP.RedisStreams.csproj", "{54458B54-49CC-454C-82B2-4AED681D9D07}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -165,6 +169,14 @@ Global
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.Build.0 = Release|Any CPU
{54458B54-49CC-454C-82B2-4AED681D9D07}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{54458B54-49CC-454C-82B2-4AED681D9D07}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54458B54-49CC-454C-82B2-4AED681D9D07}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54458B54-49CC-454C-82B2-4AED681D9D07}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -192,6 +204,8 @@ Global
{B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{54458B54-49CC-454C-82B2-4AED681D9D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 14
- 0
samples/Samples.Redis.SqlServer/AppDbContext.cs Прегледај датотеку

@@ -0,0 +1,14 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;

namespace Sample.Redis.SqlServer
{
public class AppDbContext : DbContext
{
public AppDbContext(DbContextOptions<AppDbContext> optionsBuilder) : base(optionsBuilder)
{

}

}
}

+ 39
- 0
samples/Samples.Redis.SqlServer/Controllers/HomeController.cs Прегледај датотеку

@@ -0,0 +1,39 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Samples.Redis.SqlServer.Controllers
{
[ApiController]
[Route("[controller]/[action]")]
public class HomeController : ControllerBase
{
private readonly ILogger<HomeController> _logger;
private readonly ICapPublisher publisher;

public HomeController(ILogger<HomeController> logger, ICapPublisher publisher)
{
_logger = logger;
this.publisher = publisher;
}

[HttpGet]
public async Task Publish()
{
await publisher.PublishAsync("test-message", DateTime.UtcNow);
}

[CapSubscribe("test-message")]
[NonAction]
public void Subscribe(DateTime date, [FromCap] IDictionary<string, string> headers)
{
var str = string.Join(",", headers.Select(kv => $"({kv.Key}:{kv.Value})"));
_logger.LogInformation($"test-message subscribed with value {date}, headers : {str}");
}
}
}

+ 26
- 0
samples/Samples.Redis.SqlServer/Program.cs Прегледај датотеку

@@ -0,0 +1,26 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Samples.Redis.SqlServer
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}

+ 24
- 0
samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj Прегледај датотеку

@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<UserSecretsId>78587bd3-9076-4357-869f-4f4652d35322</UserSecretsId>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.4" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.14" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RedisStreams\DotNetCore.CAP.RedisStreams.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
</ItemGroup>

<Target Name="PostBuild" AfterTargets="PostBuildEvent">
<Exec Command="docker run -dt -p 6379:6379 --name redis redis" ContinueOnError="true" />
</Target>

</Project>

+ 76
- 0
samples/Samples.Redis.SqlServer/Startup.cs Прегледај датотеку

@@ -0,0 +1,76 @@
using DotNetCore.CAP.RedisStreams;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.OpenApi.Models;
using Sample.Redis.SqlServer;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Samples.Redis.SqlServer
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}

public IConfiguration Configuration { get; }

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{

services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Samples.Redis.SqlServer", Version = "v1" });
});

services.AddDbContext<AppDbContext>(options =>
{
options.UseSqlServer("data source=.;initial catalog=cap;integrated security=true");
});

services.AddCap(options =>
{
options.UseRedis();
options.UseEntityFramework<AppDbContext>();

options.DefaultGroupName = "Samples.Redis.SqlServer";
});
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Samples.Redis.SqlServer v1"));
}

app.UseHttpsRedirection();

app.UseRouting();

app.UseAuthorization();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
}

+ 9
- 0
samples/Samples.Redis.SqlServer/appsettings.Development.json Прегледај датотеку

@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}

+ 10
- 0
samples/Samples.Redis.SqlServer/appsettings.json Прегледај датотеку

@@ -0,0 +1,10 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
}

+ 32
- 0
src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs Прегледај датотеку

@@ -0,0 +1,32 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reflection;
using DotNetCore.CAP.RedisStreams;

namespace Microsoft.Extensions.DependencyInjection
{
public static class CapRedisOptionsExtensions
{
public static CapOptions UseRedis(this CapOptions options) =>
options.UseRedis(_ => { });

public static CapOptions UseRedis(this CapOptions options, string connection) =>
options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection));


public static CapOptions UseRedis(this CapOptions options, Action<CapRedisOptions> configure)
{
if (configure is null) throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new RedisOptionsExtension(configure));

return options;
}
}
}

+ 37
- 0
src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.PostConfigure.cs Прегледај датотеку

@@ -0,0 +1,37 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class CapRedisOptionsPostConfigure : IPostConfigureOptions<CapRedisOptions>
{
public CapRedisOptionsPostConfigure()
{
}

public void PostConfigure(string name, CapRedisOptions options)
{
options.Configuration ??= new ConfigurationOptions();

if (options.StreamEntriesCount == default)
options.StreamEntriesCount = 10;

if (options.ConnectionPoolSize == default)
options.ConnectionPoolSize= 10;

if (!options.Configuration.EndPoints.Any())
{
options.Configuration.EndPoints.Add(IPAddress.Loopback, 0);
options.Configuration.SetDefaultPorts();
}
}
}
}

+ 30
- 0
src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs Прегледај датотеку

@@ -0,0 +1,30 @@
using System;
using StackExchange.Redis;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP
{
public class CapRedisOptions
{
/// <summary>
/// Gets or sets the options of redis connections
/// </summary>
public ConfigurationOptions Configuration { get; set; }

internal string Endpoint { get; set; }

/// <summary>
/// Gets or sets the count of entries consumed from stream
/// </summary>
public uint StreamEntriesCount { get; set; }
/// <summary>
/// Gets or sets the number of connections that can be used with redis server
/// </summary>
public uint ConnectionPoolSize { get; set; }
}
}

+ 22
- 0
src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj Прегледај датотеку

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.RedisStreams</AssemblyName>
<PackageTags>$(PackageTags);RedisStreams</PackageTags>
</PropertyGroup>

<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.RedisStreams.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="2.2.4" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 41
- 0
src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs Прегледај датотеку

@@ -0,0 +1,41 @@
using System;
using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP
{
class RedisOptionsExtension : ICapOptionsExtension
{
private readonly Action<CapRedisOptions> configure;
public RedisOptionsExtension(Action<CapRedisOptions> configure)
{
if (configure is null)
{
throw new ArgumentNullException(nameof(configure));
}

this.configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
services.AddSingleton<IRedisStreamManager, RedisStreamManager>();
services.AddSingleton<IConsumerClientFactory, RedisConsumerClientFactory>();
services.AddSingleton<ITransport, RedisTransport>();
services.AddSingleton<IRedisConnectionPool, RedisConnectionPool>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IPostConfigureOptions<CapRedisOptions>, CapRedisOptionsPostConfigure>());
services.AddOptions<CapRedisOptions>().Configure(configure);
}
}
}

+ 110
- 0
src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs Прегледај датотеку

@@ -0,0 +1,110 @@
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class RedisConnectionPool : IRedisConnectionPool,IDisposable
{
private readonly ConcurrentBag<AsyncLazyRedisConnection> connections = new ConcurrentBag<AsyncLazyRedisConnection>();
private readonly SemaphoreSlim poolLock = new SemaphoreSlim(1);
private readonly CapRedisOptions redisOptions;
private readonly ILoggerFactory loggerFactory;
private bool poolAlreadyConfigured = false;
private bool isDisposed;

private AsyncLazyRedisConnection QuietConnection
{
get
{
if (poolAlreadyConfigured)
return connections.OrderBy(async c => (await c).ConnectionCapacity).First();
else
return null;
}
}

public RedisConnectionPool(IOptions<CapRedisOptions> options, ILoggerFactory loggerFactory)
{
redisOptions = options.Value;
this.loggerFactory = loggerFactory;
Init().GetAwaiter().GetResult();
}

public async Task<IConnectionMultiplexer> ConnectAsync()
{
if (QuietConnection == null)
{
poolAlreadyConfigured = connections.Count(c => c.IsValueCreated) == redisOptions.ConnectionPoolSize;
if (QuietConnection != null)
return (await QuietConnection).Connection;
}

foreach (var lazy in connections)
{
if (!lazy.IsValueCreated)
return (await lazy).Connection;

var connection = await lazy;
if (connection.ConnectionCapacity == default)
return connection.Connection;
}

return (await connections.OrderBy(async c => (await c).ConnectionCapacity).First()).Connection;
}

private async Task Init()
{
try
{
await poolLock.WaitAsync();

if (connections.Any())
return;

for (int i = 0; i < redisOptions.ConnectionPoolSize; i++)
{
var connection = new AsyncLazyRedisConnection(redisOptions, loggerFactory.CreateLogger<AsyncLazyRedisConnection>());

connections.Add(connection);
}
}
finally
{
poolLock.Release();
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

void Dispose(bool disposing)
{
if (isDisposed)
return;

if (disposing)
{
foreach (var connection in this.connections)
{
if (!connection.IsValueCreated)
continue;

connection.GetAwaiter().GetResult().Dispose();
}
}

isDisposed = true;
}
}
}

+ 67
- 0
src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs Прегледај датотеку

@@ -0,0 +1,67 @@
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
public class AsyncLazyRedisConnection : Lazy<Task<RedisConnection>>
{
public AsyncLazyRedisConnection(CapRedisOptions redisOptions, ILogger<AsyncLazyRedisConnection> logger)
: base(() => ConnectAsync(redisOptions, logger))
{
}

public TaskAwaiter<RedisConnection> GetAwaiter() { return Value.GetAwaiter(); }

static async Task<RedisConnection> ConnectAsync(CapRedisOptions redisOptions, ILogger<AsyncLazyRedisConnection> logger)
{
var redisLogger = new RedisLogger(logger);

var connection = await ConnectionMultiplexer.ConnectAsync(redisOptions.Configuration, redisLogger).ConfigureAwait(false);

connection.LogEvents(logger);

return new RedisConnection(connection);
}
}

public class RedisConnection:IDisposable
{
private bool isDisposed;

public RedisConnection(IConnectionMultiplexer connection)
{
Connection = connection ?? throw new ArgumentNullException(nameof(connection));
}

public IConnectionMultiplexer Connection { get; }
public long ConnectionCapacity => Connection.GetCounters().TotalOutstanding;

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

void Dispose(bool disposing)
{
if (isDisposed)
return;

if (disposing)
{
Connection.Dispose();
}

isDisposed = true;
}
}
}

+ 13
- 0
src/DotNetCore.CAP.RedisStreams/IConnectionPool.cs Прегледај датотеку

@@ -0,0 +1,13 @@
using System;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
interface IRedisConnectionPool
{
Task<IConnectionMultiplexer> ConnectAsync();
}
}

+ 135
- 0
src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs Прегледај датотеку

@@ -0,0 +1,135 @@
using System;
using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace DotNetCore.CAP.RedisStreams
{
class RedisConsumerClient : IConsumerClient
{
private readonly ILogger<RedisConsumerClient> logger;
private readonly IRedisStreamManager redis;
private readonly CapRedisOptions options;
private readonly string groupId;
private string[] topics;

public RedisConsumerClient(
string groubId,
IRedisStreamManager redis,
CapRedisOptions options,
ILogger<RedisConsumerClient> logger
)
{
this.groupId = groubId;
this.redis = redis;
this.options = options;
this.logger = logger;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));

foreach (var topic in topics)
{
redis.CreateStreamWithConsumerGroupAsync(topic, groupId).GetAwaiter().GetResult();
}

this.topics = topics.ToArray();
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
_ = ListeningForMessagesAsync(timeout, cancellationToken);

while (true)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
}

private async Task ListeningForMessagesAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
//first time, we want to read our pending messages, in case we crashed and are recovering.
var pendingMsgs = redis.PollStreamsPendingMessagesAsync(topics, groupId, timeout, cancellationToken);

await ConsumeMessages(pendingMsgs, StreamPosition.Beginning);

//Once we consumed our history, we can start getting new messages.
var newMsgs = redis.PollStreamsLatestMessagesAsync(topics, groupId, timeout, cancellationToken);

_ = ConsumeMessages(newMsgs, StreamPosition.NewMessages);
}

private async Task ConsumeMessages(IAsyncEnumerable<RedisStream[]> streamsSet, RedisValue position)
{
await foreach (var set in streamsSet)
{
foreach (var stream in set)
{
foreach (var entry in stream.Entries)
{
if (entry.IsNull)
return;
try
{
var message = RedisMessage.Create(entry, groupId);
OnMessageReceived?.Invoke((stream.Key.ToString(), groupId, entry.Id.ToString()), message);
}
catch (Exception ex)
{
logger.LogError(ex.Message, ex);
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ConsumeError,
Reason = ex.ToString()
};
OnLog?.Invoke(entry, logArgs);
}
finally
{
string positionName = position == StreamPosition.Beginning ? nameof(StreamPosition.Beginning) : nameof(StreamPosition.NewMessages);
logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered.");
}
}
}
}
}

public void Commit(object sender)
{
var (stream, group, id) = ((string stream, string group, string id))sender;

redis.Ack(stream, group, id).GetAwaiter().GetResult();
}

public void Reject(object sender)
{
// ignore
}

public void Dispose()
{
//ignore
}

}
}

+ 33
- 0
src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs Прегледај датотеку

@@ -0,0 +1,33 @@
using System;
using DotNetCore.CAP.RedisStreams;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class RedisConsumerClientFactory : IConsumerClientFactory
{
private readonly CapRedisOptions redisOptions;
private readonly IRedisStreamManager redis;
private readonly ILogger<RedisConsumerClient> logger;

public RedisConsumerClientFactory(IOptions<CapRedisOptions> redisOptions, IRedisStreamManager redis, ILogger<RedisConsumerClient> logger)
{
this.redisOptions = redisOptions.Value;
this.redis = redis;
this.logger = logger;
}

public IConsumerClient Create(string groupId)
{
return new RedisConsumerClient(groupId, redis, redisOptions, logger);
}
}
}

+ 27
- 0
src/DotNetCore.CAP.RedisStreams/IRedis.Events.Logger.cs Прегледај датотеку

@@ -0,0 +1,27 @@
using System;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class RedisLogger : TextWriter
{
private readonly ILogger logger;

public RedisLogger(ILogger logger)
{
this.logger = logger;
}

public override void WriteLine(string value)
{
logger.LogInformation(value);
}
public override Encoding Encoding => Encoding.UTF8;
}
}

+ 57
- 0
src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs Прегледај датотеку

@@ -0,0 +1,57 @@
using System;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class RedisEvents
{
private readonly ILogger logger;

public RedisEvents(IConnectionMultiplexer connection, ILogger logger)
{
this.logger = logger;
connection.ErrorMessage += Connection_ErrorMessage;
connection.ConnectionRestored += Connection_ConnectionRestored;
connection.ConnectionFailed += Connection_ConnectionFailed;
}

private void Connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
logger.LogError(e.Exception, $"Connection failed!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}");
}

private void Connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
logger.LogWarning($"Connection restored back!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}");
}

private void Connection_ErrorMessage(object sender, RedisErrorEventArgs e)
{
logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}");
}
}

static class RedisConnectionExtensions
{
public static void LogEvents(this IConnectionMultiplexer connection, ILogger logger)
{
if (connection is null)
{
throw new ArgumentNullException(nameof(connection));
}

if (logger is null)
{
throw new ArgumentNullException(nameof(logger));
}

_ = new RedisEvents(connection, logger);
}
}
}

+ 133
- 0
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs Прегледај датотеку

@@ -0,0 +1,133 @@
using System;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
class RedisStreamManager : IRedisStreamManager
{
private readonly CapRedisOptions options;
private readonly IRedisConnectionPool connectionsPool;
private readonly ILogger<RedisStreamManager> logger;
private IConnectionMultiplexer redis;

public RedisStreamManager(IRedisConnectionPool connectionsPool, IOptions<CapRedisOptions> options, ILogger<RedisStreamManager> logger)
{
this.options = options.Value;
this.connectionsPool = connectionsPool;
this.logger = logger;
}

public async Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup)
{
await ConnectAsync();

//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored
var database = redis.GetDatabase();
var streamExist = await database.KeyTypeAsync(stream);
if (streamExist == RedisType.None)
{
await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages, true);
}
else
{
var groupInfo = await database.StreamGroupInfoAsync(stream);
if (groupInfo.Any(g => g.Name == consumerGroup))
return;
await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages);
}
}

public async Task PublishAsync(string stream, NameValueEntry[] message)
{
await ConnectAsync();

//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored
await redis.GetDatabase().StreamAddAsync(stream, message);
}

public async IAsyncEnumerable<RedisStream[]> PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token)
{
var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.NewMessages));

while (true)
{
var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token).ConfigureAwait(false);

yield return result.streams;

token.WaitHandle.WaitOne(pollDelay);
}
}

public async IAsyncEnumerable<RedisStream[]> PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token)
{
var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.Beginning));

while (true)
{
token.ThrowIfCancellationRequested();

var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token).ConfigureAwait(false);

yield return result.streams;

//Once we consumed our history of pending messages, we can break the loop.
if (result.canRead && result.streams.All(s => s.Entries.Length < options.StreamEntriesCount))
break;

token.WaitHandle.WaitOne(pollDelay);
}
}

private async Task<(bool canRead, RedisStream[] streams)> TryReadConsumerGroup(string consumerGroup, StreamPosition[] positions, CancellationToken token)
{
try
{
token.ThrowIfCancellationRequested();

var createdPositions = new List<StreamPosition>();

await ConnectAsync();

var database = redis.GetDatabase();

await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, logger))
{
createdPositions.Add(position);
}

if (!createdPositions.Any()) return (false, Array.Empty<RedisStream>());

var readSet = database.StreamReadGroupAsync(createdPositions.ToArray(), consumerGroup, consumerGroup, (int)options.StreamEntriesCount);

return (true, await readSet.ConfigureAwait(false));
}
catch (Exception ex)
{
logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}");
return (false, Array.Empty<RedisStream>());
}
}

public async Task Ack(string stream, string consumerGroup, string messageId)
{
await ConnectAsync();

await redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false);
}

private async Task ConnectAsync()
{
redis = await connectionsPool.ConnectAsync();
}
}
}

+ 56
- 0
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs Прегледај датотеку

@@ -0,0 +1,56 @@
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace StackExchange.Redis
{
static class RedisStreamManagerExtensions
{
public static async IAsyncEnumerable<StreamPosition> TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger = null)
{
foreach (var position in positions)
{
bool created = false;
try
{
var stream = position.Key;
var streamExist = await database.KeyTypeAsync(stream);
if (streamExist == RedisType.None)
{
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages, true))
{
logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]");
created = true;
}
}
else
{
var groupInfo = await database.StreamGroupInfoAsync(stream);

if (groupInfo.All(g => g.Name != consumerGroup))
{
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages))
{
logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]");
created = true;
}
}
else
created = true;
}
}
catch (Exception ex)
{
logger?.LogError(ex, $"Redis error while creating consumer group [{consumerGroup}] of stream [{position.Key}]");
}

if (created)
yield return position;
}
}
}
}

+ 23
- 0
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs Прегледај датотеку

@@ -0,0 +1,23 @@
using System;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
interface IRedisStreamManager
{
Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup);
Task PublishAsync(string stream, NameValueEntry[] message);
IAsyncEnumerable<RedisStream[]> PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, CancellationToken token);
IAsyncEnumerable<RedisStream[]> PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, CancellationToken token);
Task Ack(string stream, string consumerGroup, string messageId);
}
}

+ 51
- 0
src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs Прегледај датотеку

@@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;

namespace DotNetCore.CAP.RedisStreams
{
class RedisTransport : ITransport
{
private readonly IRedisStreamManager redis;
private readonly ILogger<RedisTransport> logger;
private readonly MethodMatcherCache selector;
private readonly CapRedisOptions options;

public RedisTransport(IRedisStreamManager redis, MethodMatcherCache selector, IOptions<CapRedisOptions> options, ILogger<RedisTransport> logger)
{
this.redis = redis;
this.selector = selector;
this.options = options.Value;
this.logger = logger;
}

public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
try
{
await redis.PublishAsync(message.GetName(), message.AsStreamEntries());

logger.LogDebug($"Redis message [{message.GetName()}] has been published.");

return OperateResult.Success;
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);

return OperateResult.Failed(wrapperEx);
}
}
}
}

+ 52
- 0
src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs Прегледај датотеку

@@ -0,0 +1,52 @@
using System;
using DotNetCore.CAP.Messages;
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

namespace DotNetCore.CAP.RedisStreams
{
static class RedisMessage
{
const string HEADERS = "headers";
const string BODY = "body";

public static NameValueEntry[] AsStreamEntries(this TransportMessage message)
{
return new[]{
new NameValueEntry(HEADERS,ToJson(message.Headers)),
new NameValueEntry(BODY,ToJson(message.Body))
};
}

public static TransportMessage Create(StreamEntry streamEntry, string groupId = null)
{
if (streamEntry.IsNull)
return null;

var headersRaw = streamEntry[HEADERS];
if (headersRaw.IsNullOrEmpty)
throw new ArgumentException($"Redis stream entry with id {streamEntry.Id} missing cap headers");

var headers = JsonSerializer.Deserialize<IDictionary<string, string>>(headersRaw);
var bodyRaw = streamEntry[BODY];
var body = !bodyRaw.IsNullOrEmpty ? JsonSerializer.Deserialize<byte[]>(bodyRaw) : null;

headers.TryAdd(Headers.Group, groupId);

return new TransportMessage(headers, body);
}

private static string ToJson(object obj)
{
return JsonSerializer.Serialize(obj, new JsonSerializerOptions(JsonSerializerDefaults.Web));
}

}
}

Loading…
Откажи
Сачувај