Browse Source

refactor publisher and subscribe

master
Savorboard 7 years ago
parent
commit
ceee751e4a
22 changed files with 239 additions and 97 deletions
  1. +6
    -14
      src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs
  2. +5
    -4
      src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs
  3. +6
    -13
      src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs
  4. +70
    -0
      src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs
  5. +1
    -2
      src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs
  6. +54
    -0
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs
  7. +1
    -2
      src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs
  8. +70
    -0
      src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
  9. +0
    -1
      src/DotNetCore.CAP/CAP.Builder.cs
  10. +9
    -5
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  11. +1
    -1
      src/DotNetCore.CAP/DotNetCore.CAP.csproj
  12. +0
    -1
      src/DotNetCore.CAP/IBootstrapper.Default.cs
  13. +2
    -35
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  14. +3
    -3
      src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs
  15. +5
    -5
      src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs
  16. +0
    -5
      src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs
  17. +1
    -1
      src/DotNetCore.CAP/LoggerExtensions.cs
  18. +1
    -1
      src/DotNetCore.CAP/Models/CapReceivedMessage.cs
  19. +1
    -1
      src/DotNetCore.CAP/Models/CapSentMessage.cs
  20. +1
    -1
      test/DotNetCore.CAP.Test/CAP.BuilderTest.cs
  21. +1
    -1
      test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs
  22. +1
    -1
      test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs

+ 6
- 14
src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs View File

@@ -1,6 +1,7 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.EntityFrameworkCore;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;

namespace Microsoft.Extensions.DependencyInjection
@@ -13,26 +14,17 @@ namespace Microsoft.Extensions.DependencyInjection
/// <summary>
/// Adds an Entity Framework implementation of message stores.
/// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <returns>The <see cref="CapBuilder"/> instance this method extends.</returns>
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext
{
//builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
builder.Services.AddScoped<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();

return builder;
}

public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder, Action<SqlServerOptions> actionOptions)
where TContext : DbContext
{
//builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();

builder.Services.AddSingleton<IStorage, EFStorage>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();
builder.Services.AddScoped<IStorageConnection, EFStorageConnection>();

builder.Services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();

builder.Services.Configure(actionOptions);
var sqlServerOptions = new SqlServerOptions();


+ 5
- 4
src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs View File

@@ -7,6 +7,7 @@ using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Processor;

namespace DotNetCore.CAP
{
@@ -25,9 +26,9 @@ namespace DotNetCore.CAP
StatusName = StatusName.Enqueued
};

var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, transaction);
WaitHandleEx.QueuePulseEvent.Set();
PublishQueuer.PulseEvent.Set();

}

@@ -40,9 +41,9 @@ namespace DotNetCore.CAP
StatusName = StatusName.Enqueued
};

var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, transaction);
WaitHandleEx.QueuePulseEvent.Set();
PublishQueuer.PulseEvent.Set();
}
}
}

+ 6
- 13
src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs View File

@@ -58,22 +58,15 @@ SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
WHERE StatusName = '{StatusName.Scheduled}'";

try
{
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault();

if (message != null)
{
_context.Attach(message);
}
var connection = _context.GetDbConnection();
var message = (await connection.QueryAsync<CapSentMessage>(sql)).FirstOrDefault();

return message;
}
catch (Exception ex)
if (message != null)
{
throw;
_context.Attach(message);
}

return message;
}

// CapReceviedMessage


+ 70
- 0
src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs View File

@@ -0,0 +1,70 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.EntityFrameworkCore
{
public class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;

private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);

private static readonly string[] Tables =
{
nameof(CapDbContext.CapSentMessages),
nameof(CapDbContext.CapReceivedMessages),
};

public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
SqlServerOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");

foreach (var table in Tables)
{
var removedCount = 0;
do
{
using (var scope = _provider.CreateScope())
{
var provider = scope.ServiceProvider;
var jobsDbContext = provider.GetService<CapDbContext>();
var connection = jobsDbContext.GetDbConnection();

removedCount = await connection.ExecuteAsync($@"
DELETE TOP (@count)
FROM [{_options.Schema}].[{table}] WITH (readpast)
WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch });
}

if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}

await context.WaitAsync(_waitingInterval);
}
}
}

+ 1
- 2
src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs View File

@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;

namespace Microsoft.Extensions.DependencyInjection
@@ -24,7 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection

builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();

builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();
builder.Services.AddTransient<IQueueExecutor, PublishQueueExecutor>();

return builder;
}


+ 54
- 0
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs View File

@@ -0,0 +1,54 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
public class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;

public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<KafkaOptions> options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_kafkaOptions = options.Value;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
producer.ProduceAsync(keyName, null, content);
producer.Flush();
}

_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");

return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
}
}
}
}

+ 1
- 2
src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs View File

@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.RabbitMQ;

namespace Microsoft.Extensions.DependencyInjection
@@ -15,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection

builder.Services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();

builder.Services.AddTransient<IMessageJobProcessor, RabbitJobProcessor>();
builder.Services.AddTransient<IQueueExecutor, PublishQueueExecutor>();

return builder;
}


+ 70
- 0
src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs View File

@@ -0,0 +1,70 @@
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace DotNetCore.CAP.RabbitMQ
{
public class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMqOptions;

public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<RabbitMQOptions> options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_rabbitMqOptions = options.Value;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMqOptions.HostName,
UserName = _rabbitMqOptions.UserName,
Port = _rabbitMqOptions.Port,
Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMqOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
};

try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);

channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
routingKey: keyName,
basicProperties: null,
body: body);

_logger.LogDebug($"rabbitmq topic message [{keyName}] has been published.");
}
return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError($"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");

return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
}

}
}
}

+ 0
- 1
src/DotNetCore.CAP/CAP.Builder.cs View File

@@ -1,5 +1,4 @@
using System;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP


+ 9
- 5
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs View File

@@ -6,8 +6,8 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Microsoft.Extensions.DependencyInjection
@@ -48,12 +48,16 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<MethodMatcherCache>();

services.AddSingleton<IProcessingServer, ConsumerHandler>();
services.AddSingleton<IProcessingServer, JobProcessingServer>();
services.AddSingleton<IProcessingServer, CapProcessingServer>();
services.AddSingleton<IBootstrapper, DefaultBootstrapper>();
services.AddSingleton<IStateChanger, StateChanger>();

//Processors
services.AddTransient<JobQueuer>();
services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>();
services.AddTransient<IMessageProcessor, DefaultMessageProcessor>();

//Executors
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>();



+ 1
- 1
src/DotNetCore.CAP/DotNetCore.CAP.csproj View File

@@ -15,7 +15,7 @@

<ItemGroup>
<None Include="IQueueExecutor.Subscibe.cs" />
<None Include="Job\IJobProcessor.MessageJob.Default.cs" />
<None Include="Processor\IProcessor.Message.Default.cs" />
</ItemGroup>

<ItemGroup>


+ 0
- 1
src/DotNetCore.CAP/IBootstrapper.Default.cs View File

@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;


+ 2
- 35
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -6,6 +6,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -106,7 +107,6 @@ namespace DotNetCore.CAP
{
var receviedMessage = StoreMessage(scope, message);
client.Commit();
// ProcessMessage(scope, receviedMessage);
}
};
}
@@ -125,40 +125,7 @@ namespace DotNetCore.CAP

public void Pulse()
{
WaitHandleEx.ReceviedPulseEvent.Set();
SubscribeQueuer.PulseEvent.Set();
}

//private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
//{
// var provider = serviceScope.ServiceProvider;
// var messageStore = provider.GetRequiredService<IStorageConnection>();
// try
// {
// var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);

// if (executeDescriptorGroup.ContainsKey(receivedMessage.Group))
// {
// messageStore.FetchNextReceivedMessageAsync



// messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait();

// // If there are multiple consumers in the same group, we will take the first
// var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
// var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());

// _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();

// messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait();
// }
// }
// catch (Exception ex)
// {
// _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
// }
//}


}
}

+ 3
- 3
src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs View File

@@ -3,8 +3,8 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;

@@ -93,7 +93,7 @@ namespace DotNetCore.CAP
}

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.LastRun = due;
message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);


+ 5
- 5
src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs View File

@@ -7,8 +7,8 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;

@@ -22,7 +22,7 @@ namespace DotNetCore.CAP
private readonly ILogger _logger;

private readonly MethodMatcherCache _selector;
private readonly CapOptions _options;
//private readonly CapOptions _options;

public SubscibeQueueExecutor(
IStateChanger stateChanger,
@@ -132,7 +132,7 @@ namespace DotNetCore.CAP
{
var retryBehavior = RetryBehavior.DefaultRetry;

var now = DateTime.UtcNow;
var now = DateTime.Now;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
@@ -140,7 +140,7 @@ namespace DotNetCore.CAP
}

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.LastRun = due;
message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);


+ 0
- 5
src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs View File

@@ -6,11 +6,6 @@ namespace DotNetCore.CAP.Infrastructure
{
public static class WaitHandleEx
{
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public static readonly AutoResetEvent QueuePulseEvent = new AutoResetEvent(true);
public static readonly AutoResetEvent SentPulseEvent = new AutoResetEvent(true);
public static readonly AutoResetEvent ReceviedPulseEvent = new AutoResetEvent(true);

public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout)
{
var t1 = handle1.WaitOneAsync(timeout);


+ 1
- 1
src/DotNetCore.CAP/LoggerExtensions.cs View File

@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP


+ 1
- 1
src/DotNetCore.CAP/Models/CapReceivedMessage.cs View File

@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Models

public DateTime Added { get; set; }

public DateTime? LastRun { get; set; }
public DateTime? ExpiresAt { get; set; }

public int Retries { get; set; }



+ 1
- 1
src/DotNetCore.CAP/Models/CapSentMessage.cs View File

@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Models

public DateTime Added { get; set; }

public DateTime? LastRun { get; set; }
public DateTime? ExpiresAt { get; set; }

public int Retries { get; set; }



+ 1
- 1
test/DotNetCore.CAP.Test/CAP.BuilderTest.cs View File

@@ -1,7 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Xunit;


+ 1
- 1
test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs View File

@@ -1,7 +1,7 @@
//using System;
//using System.Collections.Generic;
//using System.Text;
//using DotNetCore.CAP.Job;
//using DotNetCore.CAP.Processor;
//using Xunit;

//namespace DotNetCore.CAP.Test.Job


+ 1
- 1
test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs View File

@@ -4,7 +4,7 @@
//using System.Threading;
//using System.Threading.Tasks;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Job;
//using DotNetCore.CAP.Processor;
//using Microsoft.Extensions.DependencyInjection;
//using Moq;
//using Xunit;


Loading…
Cancel
Save