diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs index d7bc4d3..c6c0831 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs @@ -19,7 +19,6 @@ namespace Microsoft.Extensions.DependencyInjection where TContext : DbContext { builder.Services.AddScoped>(); - builder.Services.AddScoped(); builder.Services.AddScoped(); @@ -27,15 +26,27 @@ namespace Microsoft.Extensions.DependencyInjection } - public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder, Action options) + public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder, Action actionOptions) where TContext : DbContext { builder.Services.AddScoped>(); - - builder.Services.AddScoped(); + builder.Services.AddSingleton(); builder.Services.AddScoped(); - builder.Services.Configure(options); + builder.Services.Configure(actionOptions); + + var efOptions = new EFOptions(); + actionOptions(efOptions); + + builder.Services.AddDbContext(options => + { + options.UseSqlServer(efOptions.ConnectionString, sqlOpts => + { + sqlOpts.MigrationsHistoryTable( + efOptions.MigrationsHistoryTableName, + efOptions.MigrationsHistoryTableSchema ?? efOptions.Schema); + }); + }); return builder; } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs index cdb877b..b019803 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs @@ -7,11 +7,29 @@ namespace DotNetCore.CAP.EntityFrameworkCore public class EFOptions { public const string DefaultSchema = "cap"; + public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory"; + + /// + /// Gets or sets the database's connection string that will be used to store database entities. + /// + public string ConnectionString { get; set; } /// /// Gets or sets the schema to use when creating database objects. /// Default is . /// public string Schema { get; set; } = DefaultSchema; + + /// + /// Gets or sets the migrations history table's schema. + /// If this is null, will be used. + /// + public string MigrationsHistoryTableSchema { get; set; } + + /// + /// Gets or sets the migrations history table's name. + /// Default is . + /// + public string MigrationsHistoryTableName { get; set; } = DefaultMigrationsHistoryTableName; } } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs index 24bf796..ee078d5 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; using Microsoft.EntityFrameworkCore; @@ -25,8 +24,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore private DbSet SentMessages => Context.Set(); - private DbSet ReceivedMessages => Context.Set(); - /// /// Creates the specified in the cap message store. /// @@ -39,158 +36,5 @@ namespace DotNetCore.CAP.EntityFrameworkCore await Context.SaveChangesAsync(); return OperateResult.Success; } - - public async Task ChangeSentMessageStateAsync(CapSentMessage message, string status, - bool autoSaveChanges = true) - { - Context.Attach(message); - message.LastRun = DateTime.Now; - message.StatusName = status; - try - { - if (autoSaveChanges) - { - await Context.SaveChangesAsync(); - } - } - catch (DbUpdateConcurrencyException ex) - { - return OperateResult.Failed( - new OperateError() - { - Code = "DbUpdateConcurrencyException", - Description = ex.Message - }); - } - return OperateResult.Success; - } - - /// - /// First Enqueued Message. - /// - public async Task GetNextSentMessageToBeEnqueuedAsync() - { - return await SentMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued); - } - - /// - /// Updates a message in a store as an asynchronous operation. - /// - /// The message to update in the store. - public async Task UpdateSentMessageAsync(CapSentMessage message) - { - if (message == null) throw new ArgumentNullException(nameof(message)); - - Context.Attach(message); - message.LastRun = DateTime.Now; - Context.Update(message); - - try - { - await Context.SaveChangesAsync(); - return OperateResult.Success; - } - catch (DbUpdateConcurrencyException ex) - { - return OperateResult.Failed(new OperateError() - { - Code = "DbUpdateConcurrencyException", - Description = ex.Message - }); - } - } - - /// - /// Deletes the specified from the consistency message store. - /// - /// The message to delete. - public async Task RemoveSentMessageAsync(CapSentMessage message) - { - if (message == null) throw new ArgumentNullException(nameof(message)); - - Context.Remove(message); - try - { - await Context.SaveChangesAsync(); - return OperateResult.Success; - } - catch (DbUpdateConcurrencyException ex) - { - return OperateResult.Failed(new OperateError() - { - Code = "DbUpdateConcurrencyException", - Description = ex.Message - }); - } - } - - /// - /// Creates the specified in the consistency message store. - /// - /// The message to create. - public async Task StoreReceivedMessageAsync(CapReceivedMessage message) - { - if (message == null) throw new ArgumentNullException(nameof(message)); - - Context.Add(message); - await Context.SaveChangesAsync(); - return OperateResult.Success; - } - - public async Task ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status, - bool autoSaveChanges = true) - { - Context.Attach(message); - message.LastRun = DateTime.Now; - message.StatusName = status; - try - { - if (autoSaveChanges) - { - await Context.SaveChangesAsync(); - } - } - catch (DbUpdateConcurrencyException ex) - { - return OperateResult.Failed(new OperateError() - { - Code = "DbUpdateConcurrencyException", - Description = ex.Message - }); - } - return OperateResult.Success; - } - - public async Task GetNextReceivedMessageToBeExcuted() - { - return await ReceivedMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued); - } - - /// - /// Updates the specified in the message store. - /// - /// The message to update. - public async Task UpdateReceivedMessageAsync(CapReceivedMessage message) - { - if (message == null) throw new ArgumentNullException(nameof(message)); - - Context.Attach(message); - message.LastRun = DateTime.Now; - Context.Update(message); - - try - { - await Context.SaveChangesAsync(); - return OperateResult.Success; - } - catch (DbUpdateConcurrencyException ex) - { - return OperateResult.Failed(new OperateError() - { - Code = "DbUpdateConcurrencyException", - Description = ex.Message - }); - } - } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj b/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj index a1f9510..6a6422c 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj +++ b/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj @@ -16,7 +16,11 @@ + + + + diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs index 3026424..d0ed080 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs @@ -29,17 +29,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore public IStorageTransaction CreateTransaction() { return new EFStorageTransaction(this); - } - - public Task StoreSentMessageAsync(CapSentMessage message) - { - if (message == null) throw new ArgumentNullException(nameof(message)); - - message.LastRun = NormalizeDateTime(message.LastRun); - - _context.Add(message); - return _context.SaveChangesAsync(); - } + } public Task GetSentMessageAsync(string id) { diff --git a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs index 8534f76..f50a0a5 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs @@ -5,6 +5,8 @@ using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Job; +using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,23 +18,26 @@ namespace DotNetCore.CAP.RabbitMQ { private readonly RabbitMQOptions _rabbitMqOptions; private readonly CancellationTokenSource _cts; - + private readonly IStateChanger _stateChanger; private readonly IServiceProvider _provider; private readonly ILogger _logger; private readonly TimeSpan _pollingDelay; + internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); public RabbitJobProcessor( IOptions capOptions, IOptions rabbitMQOptions, ILogger logger, + IStateChanger stateChanger, IServiceProvider provider) { _logger = logger; _rabbitMqOptions = rabbitMQOptions.Value; _provider = provider; + _stateChanger = stateChanger; _cts = new CancellationTokenSource(); - + var capOptions1 = capOptions.Value; _pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay); } @@ -62,7 +67,7 @@ namespace DotNetCore.CAP.RabbitMQ var token = GetTokenToWaitOn(context); } - await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, + await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); } finally @@ -78,39 +83,93 @@ namespace DotNetCore.CAP.RabbitMQ private async Task Step(ProcessingContext context) { + var fetched = default(IFetchedMessage); using (var scopedContext = context.CreateScope()) { var provider = scopedContext.Provider; var messageStore = provider.GetRequiredService(); - var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); - try + var connection = provider.GetRequiredService(); + + if ((fetched = await connection.FetchNextSentMessageAsync()) != null) { - if (message != null) + using (fetched) { - var sp = Stopwatch.StartNew(); - message.StatusName = StatusName.Processing; - await messageStore.UpdateSentMessageAsync(message); + var message = await connection.GetSentMessageAsync(fetched.MessageId); + try + { + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + + if (message.Retries > 0) + { + _logger.JobRetrying(message.Retries); + } + var result = ExecuteJob(message.KeyName, message.Content); + sp.Stop(); + + var newState = default(IState); + if (!result.Succeeded) + { + var shouldRetry = await UpdateJobForRetryAsync(message, connection); + if (shouldRetry) + { + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); + } + else + { + newState = new FailedState(); + _logger.JobFailed(result.Exception); + } + } + else + { + newState = new SucceededState(); + } + await _stateChanger.ChangeStateAsync(message, newState, connection); + + fetched.RemoveFromQueue(); + + if (result.Succeeded) + { + _logger.JobExecuted(sp.Elapsed.TotalSeconds); + } + } + + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex); + return false; + } - ExecuteJob(message.KeyName, message.Content); + } + } + } + return fetched != null; + } - sp.Stop(); + private async Task UpdateJobForRetryAsync(CapSentMessage message, IStorageConnection connection) + { + var retryBehavior = RetryBehavior.DefaultRetry; - message.StatusName = StatusName.Succeeded; - await messageStore.UpdateSentMessageAsync(message); + var now = DateTime.UtcNow; + var retries = ++message.Retries; + if (retries >= retryBehavior.RetryCount) + { + return false; + } - _logger.JobExecuted(sp.Elapsed.TotalSeconds); - } - } - catch (Exception ex) - { - _logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex); - return false; - } + var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + message.LastRun = due; + using (var transaction = connection.CreateTransaction()) + { + transaction.UpdateMessage(message); + await transaction.CommitAsync(); } return true; } - private void ExecuteJob(string topic, string content) + private OperateResult ExecuteJob(string topic, string content) { var factory = new ConnectionFactory() { @@ -124,17 +183,26 @@ namespace DotNetCore.CAP.RabbitMQ SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout }; - using (var connection = factory.CreateConnection()) - using (var channel = connection.CreateModel()) + try { - var body = Encoding.UTF8.GetBytes(content); + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + var body = Encoding.UTF8.GetBytes(content); - channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE); - channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName, - routingKey: topic, - basicProperties: null, - body: body); + channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE); + channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName, + routingKey: topic, + basicProperties: null, + body: body); + } + return OperateResult.Success; + } + catch (Exception ex) + { + return OperateResult.Failed(ex, new OperateError() { Code = ex.HResult.ToString(), Description = ex.Message }); } + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICapMessageStore.cs b/src/DotNetCore.CAP/ICapMessageStore.cs index 958a749..798a203 100644 --- a/src/DotNetCore.CAP/ICapMessageStore.cs +++ b/src/DotNetCore.CAP/ICapMessageStore.cs @@ -14,62 +14,5 @@ namespace DotNetCore.CAP /// /// The message to create in the store. Task StoreSentMessageAsync(CapSentMessage message); - - /// - /// Change model status name. - /// - /// The type of . - /// The status name. - /// auto save dbcontext changes. - /// - Task ChangeSentMessageStateAsync(CapSentMessage message, string statusName, - bool autoSaveChanges = true); - - /// - /// Fetches the next message to be executed. - /// - /// - Task GetNextSentMessageToBeEnqueuedAsync(); - - /// - /// Updates a message in a store as an asynchronous operation. - /// - /// The message to update in the store. - Task UpdateSentMessageAsync(CapSentMessage message); - - /// - /// Deletes a message from the store as an asynchronous operation. - /// - /// The message to delete in the store. - Task RemoveSentMessageAsync(CapSentMessage message); - - - /// - /// Creates a new message in a store as an asynchronous operation. - /// - /// - /// - Task StoreReceivedMessageAsync(CapReceivedMessage message); - - /// - /// Change model status name. - /// - /// The type of . - /// The status name. - /// auto save dbcontext changes. - /// - Task ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, - bool autoSaveChanges = true); - - /// - /// Fetches the next message to be executed. - /// - Task GetNextReceivedMessageToBeExcuted(); - - /// - /// Updates a message in a store as an asynchronous operation. - /// - /// The message to update in the store. - Task UpdateReceivedMessageAsync(CapReceivedMessage message); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index cf2936a..6eb85ea 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -114,7 +114,7 @@ namespace DotNetCore.CAP private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) { var provider = serviceScope.ServiceProvider; - var messageStore = provider.GetRequiredService(); + var messageStore = provider.GetRequiredService(); var receivedMessage = new CapReceivedMessage(messageContext) { StatusName = StatusName.Enqueued, @@ -126,13 +126,17 @@ namespace DotNetCore.CAP private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) { var provider = serviceScope.ServiceProvider; - var messageStore = provider.GetRequiredService(); + var messageStore = provider.GetRequiredService(); try { var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) { + messageStore.FetchNextReceivedMessageAsync + + + messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); // If there are multiple consumers in the same group, we will take the first diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index da81af6..99a48ff 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -10,13 +10,7 @@ namespace DotNetCore.CAP public interface IStorageConnection : IDisposable { - //Sent messages - - /// - /// Stores the message. - /// - /// The message to store. - Task StoreSentMessageAsync(CapSentMessage message); + //Sent messages /// /// Returns the message with the given id. @@ -56,7 +50,7 @@ namespace DotNetCore.CAP /// /// Returns the next message to be enqueued. /// - Task GetNextReceviedMessageToBeEnqueuedAsync(); + Task GetNextReceviedMessageToBeEnqueuedAsync(); //----------------------------------------- diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs new file mode 100644 index 0000000..d297d31 --- /dev/null +++ b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.Job +{ + public class JobQueuer : IJobProcessor + { + private ILogger _logger; + private JobsOptions _options; + private IStateChanger _stateChanger; + private IServiceProvider _provider; + + internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); + private TimeSpan _pollingDelay; + + public JobQueuer( + ILogger logger, + JobsOptions options, + IStateChanger stateChanger, + IServiceProvider provider) + { + _logger = logger; + _options = options; + _stateChanger = stateChanger; + _provider = provider; + + _pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay); + } + + public async Task ProcessAsync(ProcessingContext context) + { + using (var scope = _provider.CreateScope()) + { + CapSentMessage sentMessage; + CapReceivedMessage receivedMessage; + var provider = scope.ServiceProvider; + var connection = provider.GetRequiredService(); + + while ( + !context.IsStopping && + (sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null) + + { + var state = new EnqueuedState(); + + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(sentMessage, state, transaction); + await transaction.CommitAsync(); + } + } + } + + context.ThrowIfStopping(); + + DelayedJobProcessor.PulseEvent.Set(); + await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); + } + } +} diff --git a/src/DotNetCore.CAP/Job/States/IState.Enqueued.cs b/src/DotNetCore.CAP/Job/States/IState.Enqueued.cs new file mode 100644 index 0000000..6588177 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.Enqueued.cs @@ -0,0 +1,24 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class EnqueuedState : IState + { + public const string StateName = "Enqueued"; + + public TimeSpan? ExpiresAfter => null; + + public string Name => StateName; + + public void Apply(CapSentMessage message, IStorageTransaction transaction) + { + transaction.EnqueueMessage(message); + } + + public void Apply(CapReceivedMessage message, IStorageTransaction transaction) + { + transaction.EnqueueMessage(message); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IState.Failed.cs b/src/DotNetCore.CAP/Job/States/IState.Failed.cs new file mode 100644 index 0000000..c779856 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.Failed.cs @@ -0,0 +1,22 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class FailedState : IState + { + public const string StateName = "Failed"; + + public TimeSpan? ExpiresAfter => TimeSpan.FromDays(15); + + public string Name => StateName; + + public void Apply(CapSentMessage message, IStorageTransaction transaction) + { + } + + public void Apply(CapReceivedMessage message, IStorageTransaction transaction) + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IState.Processing.cs b/src/DotNetCore.CAP/Job/States/IState.Processing.cs new file mode 100644 index 0000000..a5564a8 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.Processing.cs @@ -0,0 +1,22 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class ProcessingState : IState + { + public const string StateName = "Processing"; + + public TimeSpan? ExpiresAfter => null; + + public string Name => StateName; + + public void Apply(CapSentMessage message, IStorageTransaction transaction) + { + } + + public void Apply(CapReceivedMessage message, IStorageTransaction transaction) + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IState.Scheduled.cs b/src/DotNetCore.CAP/Job/States/IState.Scheduled.cs new file mode 100644 index 0000000..6a38697 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.Scheduled.cs @@ -0,0 +1,22 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class ScheduledState : IState + { + public const string StateName = "Scheduled"; + + public TimeSpan? ExpiresAfter => null; + + public string Name => StateName; + + public void Apply(CapSentMessage message, IStorageTransaction transaction) + { + } + + public void Apply(CapReceivedMessage message, IStorageTransaction transaction) + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IState.Succeeded.cs b/src/DotNetCore.CAP/Job/States/IState.Succeeded.cs new file mode 100644 index 0000000..384de56 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.Succeeded.cs @@ -0,0 +1,22 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class SucceededState : IState + { + public const string StateName = "Succeeded"; + + public TimeSpan? ExpiresAfter => TimeSpan.FromHours(1); + + public string Name => StateName; + + public void Apply(CapSentMessage message, IStorageTransaction transaction) + { + } + + public void Apply(CapReceivedMessage message, IStorageTransaction transaction) + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IState.cs b/src/DotNetCore.CAP/Job/States/IState.cs new file mode 100644 index 0000000..2219dcb --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IState.cs @@ -0,0 +1,16 @@ +using System; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public interface IState + { + TimeSpan? ExpiresAfter { get; } + + string Name { get; } + + void Apply(CapSentMessage message, IStorageTransaction transaction); + + void Apply(CapReceivedMessage message, IStorageTransaction transaction); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs b/src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs new file mode 100644 index 0000000..b3f2d2a --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs @@ -0,0 +1,41 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public class StateChanger : IStateChanger + { + public void ChangeState(CapSentMessage message, IState state, IStorageTransaction transaction) + { + //var now = DateTime.UtcNow; + //if (state.ExpiresAfter != null) + //{ + // message.ExpiresAt = now.Add(state.ExpiresAfter.Value); + //} + //else + //{ + // message.ExpiresAt = null; + //} + + message.StatusName = state.Name; + state.Apply(message, transaction); + transaction.UpdateMessage(message); + } + + public void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction) + { + //var now = DateTime.UtcNow; + //if (state.ExpiresAfter != null) + //{ + // job.ExpiresAt = now.Add(state.ExpiresAfter.Value); + //} + //else + //{ + // job.ExpiresAt = null; + //} + + message.StatusName = state.Name; + state.Apply(message, transaction); + transaction.UpdateMessage(message); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs b/src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs new file mode 100644 index 0000000..27ec4e9 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public static class StateChangerExtensions + { + public static async Task ChangeStateAsync( + this IStateChanger @this, CapSentMessage message, IState state, IStorageConnection connection) + { + using (var transaction = connection.CreateTransaction()) + { + @this.ChangeState(message, state, transaction); + await transaction.CommitAsync(); + } + } + + public static async Task ChangeStateAsync( + this IStateChanger @this, CapReceivedMessage message, IState state, IStorageConnection connection) + { + using (var transaction = connection.CreateTransaction()) + { + @this.ChangeState(message, state, transaction); + await transaction.CommitAsync(); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.cs b/src/DotNetCore.CAP/Job/States/IStateChanger.cs new file mode 100644 index 0000000..662e4e1 --- /dev/null +++ b/src/DotNetCore.CAP/Job/States/IStateChanger.cs @@ -0,0 +1,11 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Job.States +{ + public interface IStateChanger + { + void ChangeState(CapSentMessage message, IState state, IStorageTransaction transaction); + + void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/OperateResult.cs b/src/DotNetCore.CAP/OperateResult.cs index c52929e..41fcc33 100644 --- a/src/DotNetCore.CAP/OperateResult.cs +++ b/src/DotNetCore.CAP/OperateResult.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; namespace DotNetCore.CAP @@ -18,6 +19,8 @@ namespace DotNetCore.CAP /// public bool Succeeded { get; set; } + public Exception Exception { get; set; } + /// /// An of s containing an errors /// that occurred during the operation. @@ -46,6 +49,17 @@ namespace DotNetCore.CAP return result; } + public static OperateResult Failed(Exception ex, params OperateError[] errors) + { + var result = new OperateResult { Succeeded = false }; + result.Exception = ex; + if (errors != null) + { + result._errors.AddRange(errors); + } + return result; + } + /// /// Converts the value of the current object to its equivalent string representation. ///