diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs
index 938cb50..668f090 100644
--- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs
+++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs
@@ -1,6 +1,7 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.EntityFrameworkCore;
+using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
namespace Microsoft.Extensions.DependencyInjection
@@ -13,26 +14,17 @@ namespace Microsoft.Extensions.DependencyInjection
///
/// Adds an Entity Framework implementation of message stores.
///
- /// The Entity Framework database context to use.
- /// The instance this method extends.
- public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder)
- where TContext : DbContext
- {
- //builder.Services.AddScoped>();
- builder.Services.AddScoped();
- builder.Services.AddScoped();
-
- return builder;
- }
-
-
public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder, Action actionOptions)
where TContext : DbContext
{
//builder.Services.AddScoped>();
+
builder.Services.AddSingleton();
- builder.Services.AddScoped();
+ builder.Services.AddScoped();
+
+ builder.Services.AddTransient();
+
builder.Services.Configure(actionOptions);
var sqlServerOptions = new SqlServerOptions();
diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs
index 83faf1e..ad9d5be 100644
--- a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs
+++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs
@@ -7,6 +7,7 @@ using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using DotNetCore.CAP.Infrastructure;
+using DotNetCore.CAP.Processor;
namespace DotNetCore.CAP
{
@@ -25,9 +26,9 @@ namespace DotNetCore.CAP
StatusName = StatusName.Enqueued
};
- var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
+ var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, transaction);
- WaitHandleEx.QueuePulseEvent.Set();
+ PublishQueuer.PulseEvent.Set();
}
@@ -40,9 +41,9 @@ namespace DotNetCore.CAP
StatusName = StatusName.Enqueued
};
- var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)";
+ var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await connection.ExecuteAsync(sql, transaction);
- WaitHandleEx.QueuePulseEvent.Set();
+ PublishQueuer.PulseEvent.Set();
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs
index c9305c1..f996028 100644
--- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs
+++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs
@@ -58,22 +58,15 @@ SELECT TOP (1) *
FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast)
WHERE StatusName = '{StatusName.Scheduled}'";
- try
- {
- var connection = _context.GetDbConnection();
- var message = (await connection.QueryAsync(sql)).FirstOrDefault();
-
- if (message != null)
- {
- _context.Attach(message);
- }
+ var connection = _context.GetDbConnection();
+ var message = (await connection.QueryAsync(sql)).FirstOrDefault();
- return message;
- }
- catch (Exception ex)
+ if (message != null)
{
- throw;
+ _context.Attach(message);
}
+
+ return message;
}
// CapReceviedMessage
diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs
new file mode 100644
index 0000000..4014e4d
--- /dev/null
+++ b/src/DotNetCore.CAP.EntityFrameworkCore/IAdditionalProcessor.Default.cs
@@ -0,0 +1,70 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Dapper;
+using DotNetCore.CAP.Processor;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace DotNetCore.CAP.EntityFrameworkCore
+{
+ public class DefaultAdditionalProcessor : IAdditionalProcessor
+ {
+ private readonly IServiceProvider _provider;
+ private readonly ILogger _logger;
+ private readonly SqlServerOptions _options;
+
+ private const int MaxBatch = 1000;
+ private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
+ private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
+
+ private static readonly string[] Tables =
+ {
+ nameof(CapDbContext.CapSentMessages),
+ nameof(CapDbContext.CapReceivedMessages),
+ };
+
+ public DefaultAdditionalProcessor(
+ IServiceProvider provider,
+ ILogger logger,
+ SqlServerOptions sqlServerOptions)
+ {
+ _logger = logger;
+ _provider = provider;
+ _options = sqlServerOptions;
+ }
+
+ public async Task ProcessAsync(ProcessingContext context)
+ {
+ _logger.LogDebug("Collecting expired entities.");
+
+ foreach (var table in Tables)
+ {
+ var removedCount = 0;
+ do
+ {
+ using (var scope = _provider.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ var jobsDbContext = provider.GetService();
+ var connection = jobsDbContext.GetDbConnection();
+
+ removedCount = await connection.ExecuteAsync($@"
+DELETE TOP (@count)
+FROM [{_options.Schema}].[{table}] WITH (readpast)
+WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch });
+ }
+
+ if (removedCount != 0)
+ {
+ await context.WaitAsync(_delay);
+ context.ThrowIfStopping();
+ }
+ } while (removedCount != 0);
+ }
+
+ await context.WaitAsync(_waitingInterval);
+ }
+ }
+}
diff --git a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs
index 4c3375e..4f064b4 100644
--- a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs
+++ b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs
@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP;
-using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;
namespace Microsoft.Extensions.DependencyInjection
@@ -24,7 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection
builder.Services.AddSingleton();
- builder.Services.AddTransient();
+ builder.Services.AddTransient();
return builder;
}
diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs
new file mode 100644
index 0000000..e54975b
--- /dev/null
+++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs
@@ -0,0 +1,54 @@
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using Confluent.Kafka.Serialization;
+using DotNetCore.CAP.Processor.States;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace DotNetCore.CAP.Kafka
+{
+ public class PublishQueueExecutor : BasePublishQueueExecutor
+ {
+ private readonly ILogger _logger;
+ private readonly KafkaOptions _kafkaOptions;
+
+ public PublishQueueExecutor(IStateChanger stateChanger,
+ IOptions options,
+ ILogger logger)
+ : base(stateChanger, logger)
+ {
+ _logger = logger;
+ _kafkaOptions = options.Value;
+ }
+
+ public override Task PublishAsync(string keyName, string content)
+ {
+ try
+ {
+ var config = _kafkaOptions.AsRdkafkaConfig();
+ using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8)))
+ {
+ producer.ProduceAsync(keyName, null, content);
+ producer.Flush();
+ }
+
+ _logger.LogDebug($"kafka topic message [{keyName}] has been published.");
+
+ return Task.FromResult(OperateResult.Success);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
+
+ return Task.FromResult(OperateResult.Failed(ex,
+ new OperateError()
+ {
+ Code = ex.HResult.ToString(),
+ Description = ex.Message
+ }));
+ }
+ }
+ }
+}
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs
index 2afb79e..2a92956 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.BuilderExtensions.cs
@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP;
-using DotNetCore.CAP.Job;
using DotNetCore.CAP.RabbitMQ;
namespace Microsoft.Extensions.DependencyInjection
@@ -15,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection
builder.Services.AddSingleton();
- builder.Services.AddTransient();
+ builder.Services.AddTransient();
return builder;
}
diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
new file mode 100644
index 0000000..df08dd2
--- /dev/null
+++ b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
@@ -0,0 +1,70 @@
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using DotNetCore.CAP.Processor.States;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using RabbitMQ.Client;
+
+namespace DotNetCore.CAP.RabbitMQ
+{
+ public class PublishQueueExecutor : BasePublishQueueExecutor
+ {
+ private readonly ILogger _logger;
+ private readonly RabbitMQOptions _rabbitMqOptions;
+
+ public PublishQueueExecutor(IStateChanger stateChanger,
+ IOptions options,
+ ILogger logger)
+ : base(stateChanger, logger)
+ {
+ _logger = logger;
+ _rabbitMqOptions = options.Value;
+ }
+
+ public override Task PublishAsync(string keyName, string content)
+ {
+ var factory = new ConnectionFactory()
+ {
+ HostName = _rabbitMqOptions.HostName,
+ UserName = _rabbitMqOptions.UserName,
+ Port = _rabbitMqOptions.Port,
+ Password = _rabbitMqOptions.Password,
+ VirtualHost = _rabbitMqOptions.VirtualHost,
+ RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
+ SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
+ SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
+ };
+
+ try
+ {
+ using (var connection = factory.CreateConnection())
+ using (var channel = connection.CreateModel())
+ {
+ var body = Encoding.UTF8.GetBytes(content);
+
+ channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
+ channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
+ routingKey: keyName,
+ basicProperties: null,
+ body: body);
+
+ _logger.LogDebug($"rabbitmq topic message [{keyName}] has been published.");
+ }
+ return Task.FromResult(OperateResult.Success);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError($"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
+
+ return Task.FromResult(OperateResult.Failed(ex,
+ new OperateError()
+ {
+ Code = ex.HResult.ToString(),
+ Description = ex.Message
+ }));
+ }
+
+ }
+ }
+}
diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs
index 582678a..6214c12 100644
--- a/src/DotNetCore.CAP/CAP.Builder.cs
+++ b/src/DotNetCore.CAP/CAP.Builder.cs
@@ -1,5 +1,4 @@
using System;
-using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP
diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
index 208b635..e38bd2b 100644
--- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
+++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
@@ -6,8 +6,8 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
-using DotNetCore.CAP.Job;
-using DotNetCore.CAP.Job.States;
+using DotNetCore.CAP.Processor;
+using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Microsoft.Extensions.DependencyInjection
@@ -48,12 +48,16 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton();
services.AddSingleton();
- services.AddSingleton();
+ services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+
//Processors
- services.AddTransient();
-
+ services.AddTransient();
+ services.AddTransient();
+ services.AddTransient();
+
+ //Executors
services.AddSingleton();
services.AddSingleton();
diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
index ffbf70e..a59f6cd 100644
--- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj
+++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
@@ -15,7 +15,7 @@
-
+
diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs
index 7f25780..58bfe58 100644
--- a/src/DotNetCore.CAP/IBootstrapper.Default.cs
+++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs
@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using DotNetCore.CAP.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
index 1322b17..d84a669 100644
--- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
@@ -6,6 +6,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
+using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -106,7 +107,6 @@ namespace DotNetCore.CAP
{
var receviedMessage = StoreMessage(scope, message);
client.Commit();
- // ProcessMessage(scope, receviedMessage);
}
};
}
@@ -125,40 +125,7 @@ namespace DotNetCore.CAP
public void Pulse()
{
- WaitHandleEx.ReceviedPulseEvent.Set();
+ SubscribeQueuer.PulseEvent.Set();
}
-
- //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
- //{
- // var provider = serviceScope.ServiceProvider;
- // var messageStore = provider.GetRequiredService();
- // try
- // {
- // var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
-
- // if (executeDescriptorGroup.ContainsKey(receivedMessage.Group))
- // {
- // messageStore.FetchNextReceivedMessageAsync
-
-
-
- // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait();
-
- // // If there are multiple consumers in the same group, we will take the first
- // var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
- // var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
-
- // _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
-
- // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait();
- // }
- // }
- // catch (Exception ex)
- // {
- // _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
- // }
- //}
-
-
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs
index 521b9f8..09f7d40 100644
--- a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs
+++ b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs
@@ -3,8 +3,8 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
-using DotNetCore.CAP.Job;
-using DotNetCore.CAP.Job.States;
+using DotNetCore.CAP.Processor;
+using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
- message.LastRun = due;
+ message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs
index 31cc863..d69cdf4 100644
--- a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs
+++ b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs
@@ -7,8 +7,8 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
-using DotNetCore.CAP.Job;
-using DotNetCore.CAP.Job.States;
+using DotNetCore.CAP.Processor;
+using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
@@ -22,7 +22,7 @@ namespace DotNetCore.CAP
private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
- private readonly CapOptions _options;
+ //private readonly CapOptions _options;
public SubscibeQueueExecutor(
IStateChanger stateChanger,
@@ -132,7 +132,7 @@ namespace DotNetCore.CAP
{
var retryBehavior = RetryBehavior.DefaultRetry;
- var now = DateTime.UtcNow;
+ var now = DateTime.Now;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
@@ -140,7 +140,7 @@ namespace DotNetCore.CAP
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
- message.LastRun = due;
+ message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
diff --git a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs
index 24cfbdf..5a277d2 100644
--- a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs
+++ b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs
@@ -6,11 +6,6 @@ namespace DotNetCore.CAP.Infrastructure
{
public static class WaitHandleEx
{
- public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
- public static readonly AutoResetEvent QueuePulseEvent = new AutoResetEvent(true);
- public static readonly AutoResetEvent SentPulseEvent = new AutoResetEvent(true);
- public static readonly AutoResetEvent ReceviedPulseEvent = new AutoResetEvent(true);
-
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout)
{
var t1 = handle1.WaitOneAsync(timeout);
diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs
index 0d1894a..5bb3ee5 100644
--- a/src/DotNetCore.CAP/LoggerExtensions.cs
+++ b/src/DotNetCore.CAP/LoggerExtensions.cs
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using DotNetCore.CAP.Job;
+using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
index ce1df51..dfa5624 100644
--- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
+++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Models
public DateTime Added { get; set; }
- public DateTime? LastRun { get; set; }
+ public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
diff --git a/src/DotNetCore.CAP/Models/CapSentMessage.cs b/src/DotNetCore.CAP/Models/CapSentMessage.cs
index f615fa7..6752536 100644
--- a/src/DotNetCore.CAP/Models/CapSentMessage.cs
+++ b/src/DotNetCore.CAP/Models/CapSentMessage.cs
@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Models
public DateTime Added { get; set; }
- public DateTime? LastRun { get; set; }
+ public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs
index 7d48704..6289ee2 100644
--- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs
+++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
-using DotNetCore.CAP.Job;
+using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
diff --git a/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs b/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs
index 731851b..a69e0fd 100644
--- a/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs
+++ b/test/DotNetCore.CAP.Test/Job/ComputedJobTest.cs
@@ -1,7 +1,7 @@
//using System;
//using System.Collections.Generic;
//using System.Text;
-//using DotNetCore.CAP.Job;
+//using DotNetCore.CAP.Processor;
//using Xunit;
//namespace DotNetCore.CAP.Test.Job
diff --git a/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs b/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs
index 105dd13..cfc498d 100644
--- a/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs
+++ b/test/DotNetCore.CAP.Test/Job/JobProcessingServerTest.cs
@@ -4,7 +4,7 @@
//using System.Threading;
//using System.Threading.Tasks;
//using DotNetCore.CAP.Infrastructure;
-//using DotNetCore.CAP.Job;
+//using DotNetCore.CAP.Processor;
//using Microsoft.Extensions.DependencyInjection;
//using Moq;
//using Xunit;