diff --git a/CAP.sln b/CAP.sln index c9c05b5..e566422 100644 --- a/CAP.sln +++ b/CAP.sln @@ -66,6 +66,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureService EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.MySql", "samples\Sample.AzureServiceBus.MySql\Sample.AzureServiceBus.MySql.csproj", "{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.InMemoryStorage", "src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj", "{58B6E829-C6C8-457C-9DD0-C600650254DF}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -136,6 +138,10 @@ Global {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.Build.0 = Debug|Any CPU {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.ActiveCfg = Release|Any CPU {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.Build.0 = Release|Any CPU + {58B6E829-C6C8-457C-9DD0-C600650254DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {58B6E829-C6C8-457C-9DD0-C600650254DF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {58B6E829-C6C8-457C-9DD0-C600650254DF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {58B6E829-C6C8-457C-9DD0-C600650254DF}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -158,6 +164,7 @@ Global {11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} {63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {58B6E829-C6C8-457C-9DD0-C600650254DF} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs b/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs index bd3f6d7..6f1e2cd 100644 --- a/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs @@ -1,10 +1,7 @@ using System; -using System.Data; using System.Threading.Tasks; -using Dapper; using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; -using MySql.Data.MySqlClient; namespace Sample.AzureServiceBus.MySql.Controllers { @@ -21,50 +18,15 @@ namespace Sample.AzureServiceBus.MySql.Controllers [Route("~/without/transaction")] public async Task WithoutTransaction() { - await _capBus.PublishAsync("sample.azure.mysql", DateTime.Now); + await _capBus.PublishAsync("sample.azure.mysql2", DateTime.Now); return Ok(); } - [Route("~/adonet/transaction")] - public IActionResult AdonetWithTransaction() - { - using (var connection = new MySqlConnection("")) - { - using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) - { - //your business code - connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); - - for (int i = 0; i < 5; i++) - { - _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); - } - - transaction.Commit(); - } - } - - return Ok(); - } - - - [CapSubscribe("sample.azure.mysql")] - public void Test2(DateTime value) - { - Console.WriteLine("Subscriber output message: " + value); - } - - [CapSubscribe("sample.azure.mysql")] + [CapSubscribe("sample.azure.mysql2")] public void Test2T2(DateTime value) { - Console.WriteLine("Test2T2-->Subscriber output message: " + value); - } - - [CapSubscribe("sample.azure.mysql",Group = "groupd")] - public void Test2Group(DateTime value) - { - Console.WriteLine("Group--> Subscriber output message: " + value); + Console.WriteLine("Subscriber output message: " + value); } } } \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj b/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj index 60cc292..9cd6894 100644 --- a/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj +++ b/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj @@ -12,7 +12,7 @@ - + diff --git a/samples/Sample.AzureServiceBus.MySql/Startup.cs b/samples/Sample.AzureServiceBus.MySql/Startup.cs index d2bbc35..f6c1db1 100644 --- a/samples/Sample.AzureServiceBus.MySql/Startup.cs +++ b/samples/Sample.AzureServiceBus.MySql/Startup.cs @@ -9,8 +9,8 @@ namespace Sample.AzureServiceBus.MySql { services.AddCap(x => { - x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;"); - x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="); + x.UseInMemoryStorage(); + x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="); x.UseDashboard(); }); diff --git a/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs new file mode 100644 index 0000000..150a570 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs @@ -0,0 +1,26 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.InMemoryStorage; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal class InMemoryCapOptionsExtension : ICapOptionsExtension + { + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(); + + services.AddTransient(); + services.AddTransient(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.InMemoryStorage/CAP.Options.Extensions.cs new file mode 100644 index 0000000..e697db4 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/CAP.Options.Extensions.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UseInMemoryStorage(this CapOptions options) + { + options.RegisterExtension(new InMemoryCapOptionsExtension()); + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj b/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj new file mode 100644 index 0000000..9375b62 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + DotNetCore.CAP.InMemoryStorage + $(PackageTags);InMemory + + + + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MySql.xml + 1701;1702;1705;CS1591 + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs new file mode 100644 index 0000000..12e9f8c --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs @@ -0,0 +1,34 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP.InMemoryStorage +{ + public class InMemoryPublisher : CapPublisherBase, ICallbackPublisher + { + public InMemoryPublisher(IServiceProvider provider) : base(provider) + { + } + + public async Task PublishCallbackAsync(CapPublishedMessage message) + { + await PublishAsyncInternal(message); + } + + protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, + CancellationToken cancel = default(CancellationToken)) + { + var connection = (InMemoryStorageConnection)ServiceProvider.GetService(); + + connection.PublishedMessages.Add(message); + + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.MySql.cs b/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.MySql.cs new file mode 100644 index 0000000..26c84b6 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.MySql.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class InMemoryCapTransaction : CapTransactionBase + { + public InMemoryCapTransaction(IDispatcher dispatcher) : base(dispatcher) + { + } + + public override void Commit() + { + Flush(); + } + + public override void Rollback() + { + //Ignore + } + + public override void Dispose() + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs new file mode 100644 index 0000000..16c0267 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs @@ -0,0 +1,34 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP.InMemoryStorage +{ + internal class InMemoryCollectProcessor : ICollectProcessor + { + private readonly ILogger _logger; + private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); + + public InMemoryCollectProcessor(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug($"Collecting expired data from memory list."); + + var connection = (InMemoryStorageConnection)context.Provider.GetService(); + + connection.PublishedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now); + connection.ReceivedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now); + + await context.WaitAsync(_waitingInterval); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs new file mode 100644 index 0000000..bb73d58 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs @@ -0,0 +1,196 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.Dashboard.Monitoring; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.InMemoryStorage +{ + internal class InMemoryMonitoringApi : IMonitoringApi + { + private readonly InMemoryStorage _storage; + + public InMemoryMonitoringApi(IStorage storage) + { + _storage = storage as InMemoryStorage ?? throw new ArgumentNullException(nameof(storage)); + } + + public StatisticsDto GetStatistics() + { + var connection = (InMemoryStorageConnection)_storage.GetConnection(); + var stats = new StatisticsDto + { + PublishedSucceeded = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded), + ReceivedSucceeded = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded), + PublishedFailed = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Failed), + ReceivedFailed = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed) + }; + return stats; + } + + public IDictionary HourlyFailedJobs(MessageType type) + { + return GetHourlyTimelineStats(type, StatusName.Failed); + } + + public IDictionary HourlySucceededJobs(MessageType type) + { + return GetHourlyTimelineStats(type, StatusName.Succeeded); + } + + public IList Messages(MessageQueryDto queryDto) + { + var connection = GetConnection(); + if (queryDto.MessageType == MessageType.Publish) + { + var expression = connection.PublishedMessages.Where(x => true); + + if (!string.IsNullOrEmpty(queryDto.StatusName)) + { + expression = expression.Where(x => x.StatusName == queryDto.StatusName); + } + + if (!string.IsNullOrEmpty(queryDto.Name)) + { + expression = expression.Where(x => x.Name == queryDto.Name); + } + + if (!string.IsNullOrEmpty(queryDto.Content)) + { + //TODO: StartsWith will replace with regex + expression = expression.Where(x => x.Content.StartsWith(queryDto.Content)); + } + + var offset = queryDto.CurrentPage * queryDto.PageSize; + var size = queryDto.PageSize; + + return expression.Skip(offset).Take(size).Select(x => new MessageDto() + { + Added = x.Added, + Content = x.Content, + ExpiresAt = x.ExpiresAt, + Id = x.Id, + Name = x.Name, + Retries = x.Retries, + StatusName = x.StatusName + }).ToList(); + } + else + { + var expression = connection.ReceivedMessages.Where(x => true); + + if (!string.IsNullOrEmpty(queryDto.StatusName)) + { + expression = expression.Where(x => x.StatusName == queryDto.StatusName); + } + + if (!string.IsNullOrEmpty(queryDto.Name)) + { + expression = expression.Where(x => x.Name == queryDto.Name); + } + + if (!string.IsNullOrEmpty(queryDto.Group)) + { + expression = expression.Where(x => x.Group == queryDto.Name); + } + + if (!string.IsNullOrEmpty(queryDto.Content)) + { + //TODO: StartsWith will replace with regex + expression = expression.Where(x => x.Content.StartsWith(queryDto.Content)); + } + + var offset = queryDto.CurrentPage * queryDto.PageSize; + var size = queryDto.PageSize; + + return expression.Skip(offset).Take(size).Select(x => new MessageDto() + { + Added = x.Added, + Content = x.Content, + ExpiresAt = x.ExpiresAt, + Id = x.Id, + Name = x.Name, + Retries = x.Retries, + StatusName = x.StatusName + }).ToList(); + } + } + + public int PublishedFailedCount() + { + return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Failed); + } + + public int PublishedSucceededCount() + { + return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded); + } + + public int ReceivedFailedCount() + { + return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Failed); + } + + public int ReceivedSucceededCount() + { + return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded); + } + + private InMemoryStorageConnection GetConnection() + { + return (InMemoryStorageConnection)_storage.GetConnection(); + } + + private Dictionary GetHourlyTimelineStats(MessageType type, string statusName) + { + var endDate = DateTime.Now; + var dates = new List(); + for (var i = 0; i < 24; i++) + { + dates.Add(endDate); + endDate = endDate.AddHours(-1); + } + + var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x); + + var connection = GetConnection(); + + Dictionary valuesMap; + if (type == MessageType.Publish) + { + valuesMap = connection.PublishedMessages + .Where(x => x.StatusName == statusName) + .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) + .ToDictionary(x => x.Key, x => x.Count()); + } + else + { + valuesMap = connection.ReceivedMessages + .Where(x => x.StatusName == statusName) + .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) + .ToDictionary(x => x.Key, x => x.Count()); + } + + foreach (var key in keyMaps.Keys) + { + if (!valuesMap.ContainsKey(key)) + { + valuesMap.Add(key, 0); + } + } + + var result = new Dictionary(); + for (var i = 0; i < keyMaps.Count; i++) + { + var value = valuesMap[keyMaps.ElementAt(i).Key]; + result.Add(keyMaps.ElementAt(i).Value, value); + } + return result; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs new file mode 100644 index 0000000..55cf139 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs @@ -0,0 +1,34 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Dashboard; + +namespace DotNetCore.CAP.InMemoryStorage +{ + public class InMemoryStorage : IStorage + { + private readonly IStorageConnection _connection; + + public InMemoryStorage(IStorageConnection connection) + { + _connection = connection; + } + + public IStorageConnection GetConnection() + { + return _connection; + } + + public IMonitoringApi GetMonitoringApi() + { + return new InMemoryMonitoringApi(this); + } + + public Task InitializeAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs new file mode 100644 index 0000000..61bdae6 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs @@ -0,0 +1,88 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Async; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.InMemoryStorage +{ + public class InMemoryStorageConnection : IStorageConnection + { + private readonly CapOptions _capOptions; + + public InMemoryStorageConnection(CapOptions capOptions) + { + _capOptions = capOptions; + + PublishedMessages = new List(); + ReceivedMessages = new List(); + } + + internal List PublishedMessages { get; } + + internal List ReceivedMessages { get; } + + public IStorageTransaction CreateTransaction() + { + return new InMemoryStorageTransaction(this); + } + + public Task GetPublishedMessageAsync(long id) + { + return PublishedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id); + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + return await PublishedMessages.ToAsyncEnumerable() + .Where(x => x.Retries < _capOptions.FailedRetryCount + && x.Added < DateTime.Now.AddSeconds(-10) + && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) + .Take(200) + .ToListAsync(); + } + + public void StoreReceivedMessage(CapReceivedMessage message) + { + ReceivedMessages.Add(message); + } + + public Task GetReceivedMessageAsync(long id) + { + return ReceivedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id); + } + + public async Task> GetReceivedMessagesOfNeedRetry() + { + return await ReceivedMessages.ToAsyncEnumerable() + .Where(x => x.Retries < _capOptions.FailedRetryCount + && x.Added < DateTime.Now.AddSeconds(-10) + && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) + .Take(200) + .ToListAsync(); + } + + public bool ChangePublishedState(long messageId, string state) + { + var msg = PublishedMessages.First(x => x.Id == messageId); + msg.Retries++; + msg.ExpiresAt = null; + msg.StatusName = state; + return true; + } + + public bool ChangeReceivedState(long messageId, string state) + { + var msg = ReceivedMessages.First(x => x.Id == messageId); + msg.Retries++; + msg.ExpiresAt = null; + msg.StatusName = state; + return true; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs new file mode 100644 index 0000000..40bf28f --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs @@ -0,0 +1,58 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.InMemoryStorage +{ + public class InMemoryStorageTransaction : IStorageTransaction + { + private readonly InMemoryStorageConnection _connection; + + public InMemoryStorageTransaction(InMemoryStorageConnection connection) + { + _connection = connection; + } + + public void UpdateMessage(CapPublishedMessage message) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var msg = _connection.PublishedMessages.FirstOrDefault(x => message.Id == x.Id); + if (msg == null) return; + msg.Retries = message.Retries; + msg.Content = message.Content; + msg.ExpiresAt = message.ExpiresAt; + msg.StatusName = message.StatusName; + } + + public void UpdateMessage(CapReceivedMessage message) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + var msg = _connection.ReceivedMessages.FirstOrDefault(x => message.Id == x.Id); + if (msg == null) return; + msg.Retries = message.Retries; + msg.Content = message.Content; + msg.ExpiresAt = message.ExpiresAt; + msg.StatusName = message.StatusName; + } + + public Task CommitAsync() + { + return Task.CompletedTask; + } + + public void Dispose() + { + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICallbackPublisher.cs b/src/DotNetCore.CAP/ICallbackPublisher.cs index 0046e94..e92e6c9 100644 --- a/src/DotNetCore.CAP/ICallbackPublisher.cs +++ b/src/DotNetCore.CAP/ICallbackPublisher.cs @@ -7,7 +7,7 @@ using DotNetCore.CAP.Models; namespace DotNetCore.CAP { /// - /// A callback that is sent to Productor after a successful consumer execution + /// A callback that is sent to Producer after a successful consumer execution /// public interface ICallbackPublisher {