From ceee751e4a476e867c2228ff0a8caba337e7b782 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 14 Jul 2017 01:09:44 +0800 Subject: [PATCH] refactor publisher and subscribe --- .../CAP.BuilderExtensions.cs | 20 ++---- .../CapPublisherExtensions.cs | 9 +-- .../EFStorageConnection.cs | 19 ++--- .../IAdditionalProcessor.Default.cs | 70 +++++++++++++++++++ .../CAP.BuilderExtensions.cs | 3 +- .../PublishQueueExecutor.cs | 54 ++++++++++++++ .../CAP.BuilderExtensions.cs | 3 +- .../PublishQueueExecutor.cs | 70 +++++++++++++++++++ src/DotNetCore.CAP/CAP.Builder.cs | 1 - .../CAP.ServiceCollectionExtensions.cs | 14 ++-- src/DotNetCore.CAP/DotNetCore.CAP.csproj | 2 +- src/DotNetCore.CAP/IBootstrapper.Default.cs | 1 - .../IConsumerHandler.Default.cs | 37 +--------- .../IQueueExecutor.Publish.Base.cs | 6 +- src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs | 10 +-- .../Infrastructure/WaitHandleEx.cs | 5 -- src/DotNetCore.CAP/LoggerExtensions.cs | 2 +- .../Models/CapReceivedMessage.cs | 2 +- src/DotNetCore.CAP/Models/CapSentMessage.cs | 2 +- test/DotNetCore.CAP.Test/CAP.BuilderTest.cs | 2 +- .../Job/ComputedJobTest.cs | 2 +- .../Job/JobProcessingServerTest.cs | 2 +- 22 files changed, 239 insertions(+), 97 deletions(-) create mode 100644 src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs create mode 100644 src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs create mode 100644 src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs index 938cb50..668f090 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs @@ -1,6 +1,7 @@ using System; using DotNetCore.CAP; using DotNetCore.CAP.EntityFrameworkCore; +using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; namespace Microsoft.Extensions.DependencyInjection @@ -13,26 +14,17 @@ namespace Microsoft.Extensions.DependencyInjection /// /// Adds an Entity Framework implementation of message stores. /// - /// The Entity Framework database context to use. - /// The instance this method extends. - public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder) - where TContext : DbContext - { - //builder.Services.AddScoped>(); - builder.Services.AddScoped(); - builder.Services.AddScoped(); - - return builder; - } - - public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder, Action actionOptions) where TContext : DbContext { //builder.Services.AddScoped>(); + builder.Services.AddSingleton(); - builder.Services.AddScoped(); + builder.Services.AddScoped(); + + builder.Services.AddTransient(); + builder.Services.Configure(actionOptions); var sqlServerOptions = new SqlServerOptions(); diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs index 83faf1e..ad9d5be 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs @@ -7,6 +7,7 @@ using Dapper; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Processor; namespace DotNetCore.CAP { @@ -25,9 +26,9 @@ namespace DotNetCore.CAP StatusName = StatusName.Enqueued }; - var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; + var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)"; await connection.ExecuteAsync(sql, transaction); - WaitHandleEx.QueuePulseEvent.Set(); + PublishQueuer.PulseEvent.Set(); } @@ -40,9 +41,9 @@ namespace DotNetCore.CAP StatusName = StatusName.Enqueued }; - var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; + var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)"; await connection.ExecuteAsync(sql, transaction); - WaitHandleEx.QueuePulseEvent.Set(); + PublishQueuer.PulseEvent.Set(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs index c9305c1..f996028 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs @@ -58,22 +58,15 @@ SELECT TOP (1) * FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'"; - try - { - var connection = _context.GetDbConnection(); - var message = (await connection.QueryAsync(sql)).FirstOrDefault(); - - if (message != null) - { - _context.Attach(message); - } + var connection = _context.GetDbConnection(); + var message = (await connection.QueryAsync(sql)).FirstOrDefault(); - return message; - } - catch (Exception ex) + if (message != null) { - throw; + _context.Attach(message); } + + return message; } // CapReceviedMessage diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs new file mode 100644 index 0000000..4014e4d --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs @@ -0,0 +1,70 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.EntityFrameworkCore +{ + public class DefaultAdditionalProcessor : IAdditionalProcessor + { + private readonly IServiceProvider _provider; + private readonly ILogger _logger; + private readonly SqlServerOptions _options; + + private const int MaxBatch = 1000; + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2); + + private static readonly string[] Tables = + { + nameof(CapDbContext.CapSentMessages), + nameof(CapDbContext.CapReceivedMessages), + }; + + public DefaultAdditionalProcessor( + IServiceProvider provider, + ILogger logger, + SqlServerOptions sqlServerOptions) + { + _logger = logger; + _provider = provider; + _options = sqlServerOptions; + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug("Collecting expired entities."); + + foreach (var table in Tables) + { + var removedCount = 0; + do + { + using (var scope = _provider.CreateScope()) + { + var provider = scope.ServiceProvider; + var jobsDbContext = provider.GetService(); + var connection = jobsDbContext.GetDbConnection(); + + removedCount = await connection.ExecuteAsync($@" +DELETE TOP (@count) +FROM [{_options.Schema}].[{table}] WITH (readpast) +WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch }); + } + + if (removedCount != 0) + { + await context.WaitAsync(_delay); + context.ThrowIfStopping(); + } + } while (removedCount != 0); + } + + await context.WaitAsync(_waitingInterval); + } + } +} diff --git a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs index 4c3375e..4f064b4 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs @@ -1,6 +1,5 @@ using System; using DotNetCore.CAP; -using DotNetCore.CAP.Job; using DotNetCore.CAP.Kafka; namespace Microsoft.Extensions.DependencyInjection @@ -24,7 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection builder.Services.AddSingleton(); - builder.Services.AddTransient(); + builder.Services.AddTransient(); return builder; } diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs new file mode 100644 index 0000000..e54975b --- /dev/null +++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs @@ -0,0 +1,54 @@ +using System; +using System.Text; +using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Serialization; +using DotNetCore.CAP.Processor.States; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Kafka +{ + public class PublishQueueExecutor : BasePublishQueueExecutor + { + private readonly ILogger _logger; + private readonly KafkaOptions _kafkaOptions; + + public PublishQueueExecutor(IStateChanger stateChanger, + IOptions options, + ILogger logger) + : base(stateChanger, logger) + { + _logger = logger; + _kafkaOptions = options.Value; + } + + public override Task PublishAsync(string keyName, string content) + { + try + { + var config = _kafkaOptions.AsRdkafkaConfig(); + using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) + { + producer.ProduceAsync(keyName, null, content); + producer.Flush(); + } + + _logger.LogDebug($"kafka topic message [{keyName}] has been published."); + + return Task.FromResult(OperateResult.Success); + } + catch (Exception ex) + { + _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); + + return Task.FromResult(OperateResult.Failed(ex, + new OperateError() + { + Code = ex.HResult.ToString(), + Description = ex.Message + })); + } + } + } +} diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs index 2afb79e..2a92956 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs @@ -1,6 +1,5 @@ using System; using DotNetCore.CAP; -using DotNetCore.CAP.Job; using DotNetCore.CAP.RabbitMQ; namespace Microsoft.Extensions.DependencyInjection @@ -15,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection builder.Services.AddSingleton(); - builder.Services.AddTransient(); + builder.Services.AddTransient(); return builder; } diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs new file mode 100644 index 0000000..df08dd2 --- /dev/null +++ b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs @@ -0,0 +1,70 @@ +using System; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Processor.States; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; + +namespace DotNetCore.CAP.RabbitMQ +{ + public class PublishQueueExecutor : BasePublishQueueExecutor + { + private readonly ILogger _logger; + private readonly RabbitMQOptions _rabbitMqOptions; + + public PublishQueueExecutor(IStateChanger stateChanger, + IOptions options, + ILogger logger) + : base(stateChanger, logger) + { + _logger = logger; + _rabbitMqOptions = options.Value; + } + + public override Task PublishAsync(string keyName, string content) + { + var factory = new ConnectionFactory() + { + HostName = _rabbitMqOptions.HostName, + UserName = _rabbitMqOptions.UserName, + Port = _rabbitMqOptions.Port, + Password = _rabbitMqOptions.Password, + VirtualHost = _rabbitMqOptions.VirtualHost, + RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout, + SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout, + SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout + }; + + try + { + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + var body = Encoding.UTF8.GetBytes(content); + + channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE); + channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName, + routingKey: keyName, + basicProperties: null, + body: body); + + _logger.LogDebug($"rabbitmq topic message [{keyName}] has been published."); + } + return Task.FromResult(OperateResult.Success); + } + catch (Exception ex) + { + _logger.LogError($"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); + + return Task.FromResult(OperateResult.Failed(ex, + new OperateError() + { + Code = ex.HResult.ToString(), + Description = ex.Message + })); + } + + } + } +} diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs index 582678a..6214c12 100644 --- a/src/DotNetCore.CAP/CAP.Builder.cs +++ b/src/DotNetCore.CAP/CAP.Builder.cs @@ -1,5 +1,4 @@ using System; -using DotNetCore.CAP.Job; using Microsoft.Extensions.DependencyInjection; namespace DotNetCore.CAP diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 208b635..e38bd2b 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -6,8 +6,8 @@ using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Job; -using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.DependencyInjection.Extensions; namespace Microsoft.Extensions.DependencyInjection @@ -48,12 +48,16 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + //Processors - services.AddTransient(); - + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + + //Executors services.AddSingleton(); services.AddSingleton(); diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index ffbf70e..a59f6cd 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs index 7f25780..58bfe58 100644 --- a/src/DotNetCore.CAP/IBootstrapper.Default.cs +++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 1322b17..d84a669 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -6,6 +6,7 @@ using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -106,7 +107,6 @@ namespace DotNetCore.CAP { var receviedMessage = StoreMessage(scope, message); client.Commit(); - // ProcessMessage(scope, receviedMessage); } }; } @@ -125,40 +125,7 @@ namespace DotNetCore.CAP public void Pulse() { - WaitHandleEx.ReceviedPulseEvent.Set(); + SubscribeQueuer.PulseEvent.Set(); } - - //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) - //{ - // var provider = serviceScope.ServiceProvider; - // var messageStore = provider.GetRequiredService(); - // try - // { - // var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); - - // if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) - // { - // messageStore.FetchNextReceivedMessageAsync - - - - // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); - - // // If there are multiple consumers in the same group, we will take the first - // var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; - // var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); - - // _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); - - // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); - // } - // } - // catch (Exception ex) - // { - // _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); - // } - //} - - } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs index 521b9f8..09f7d40 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs @@ -3,8 +3,8 @@ using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading.Tasks; -using DotNetCore.CAP.Job; -using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; @@ -93,7 +93,7 @@ namespace DotNetCore.CAP } var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); - message.LastRun = due; + message.ExpiresAt = due; using (var transaction = connection.CreateTransaction()) { transaction.UpdateMessage(message); diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs index 31cc863..d69cdf4 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs @@ -7,8 +7,8 @@ using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Job; -using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; @@ -22,7 +22,7 @@ namespace DotNetCore.CAP private readonly ILogger _logger; private readonly MethodMatcherCache _selector; - private readonly CapOptions _options; + //private readonly CapOptions _options; public SubscibeQueueExecutor( IStateChanger stateChanger, @@ -132,7 +132,7 @@ namespace DotNetCore.CAP { var retryBehavior = RetryBehavior.DefaultRetry; - var now = DateTime.UtcNow; + var now = DateTime.Now; var retries = ++message.Retries; if (retries >= retryBehavior.RetryCount) { @@ -140,7 +140,7 @@ namespace DotNetCore.CAP } var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); - message.LastRun = due; + message.ExpiresAt = due; using (var transaction = connection.CreateTransaction()) { transaction.UpdateMessage(message); diff --git a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs index 24cfbdf..5a277d2 100644 --- a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs +++ b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs @@ -6,11 +6,6 @@ namespace DotNetCore.CAP.Infrastructure { public static class WaitHandleEx { - public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); - public static readonly AutoResetEvent QueuePulseEvent = new AutoResetEvent(true); - public static readonly AutoResetEvent SentPulseEvent = new AutoResetEvent(true); - public static readonly AutoResetEvent ReceviedPulseEvent = new AutoResetEvent(true); - public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) { var t1 = handle1.WaitOneAsync(timeout); diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index 0d1894a..5bb3ee5 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; -using DotNetCore.CAP.Job; +using DotNetCore.CAP.Processor; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs index ce1df51..dfa5624 100644 --- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs @@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Models public DateTime Added { get; set; } - public DateTime? LastRun { get; set; } + public DateTime? ExpiresAt { get; set; } public int Retries { get; set; } diff --git a/src/DotNetCore.CAP/Models/CapSentMessage.cs b/src/DotNetCore.CAP/Models/CapSentMessage.cs index f615fa7..6752536 100644 --- a/src/DotNetCore.CAP/Models/CapSentMessage.cs +++ b/src/DotNetCore.CAP/Models/CapSentMessage.cs @@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Models public DateTime Added { get; set; } - public DateTime? LastRun { get; set; } + public DateTime? ExpiresAt { get; set; } public int Retries { get; set; } diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs index 7d48704..6289ee2 100644 --- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs +++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs @@ -1,7 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Job; +using DotNetCore.CAP.Processor; using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Xunit; diff --git a/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs b/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs index 731851b..a69e0fd 100644 --- a/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs +++ b/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs @@ -1,7 +1,7 @@ //using System; //using System.Collections.Generic; //using System.Text; -//using DotNetCore.CAP.Job; +//using DotNetCore.CAP.Processor; //using Xunit; //namespace DotNetCore.CAP.Test.Job diff --git a/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs b/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs index 105dd13..cfc498d 100644 --- a/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs +++ b/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs @@ -4,7 +4,7 @@ //using System.Threading; //using System.Threading.Tasks; //using DotNetCore.CAP.Infrastructure; -//using DotNetCore.CAP.Job; +//using DotNetCore.CAP.Processor; //using Microsoft.Extensions.DependencyInjection; //using Moq; //using Xunit;