From 3f50de25e8f41f1d9019044a29619e457cfe0198 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 15 Nov 2019 17:54:52 +0800 Subject: [PATCH] Refactoring inmemory storage implementation for version 3.0 --- .../CAP.InMemoryCapOptionsExtension.cs | 10 +- .../DotNetCore.CAP.InMemoryStorage.csproj | 4 - .../ICapPublisher.InMemory.cs | 34 ---- .../ICapTransaction.InMemory.cs | 18 ++- .../ICollectProcessor.InMemory.cs | 34 ---- .../IDataStorage.InMemory.cs | 148 ++++++++++++++++++ .../IMonitoringApi.InMemory.cs | 69 ++++---- .../IStorage.InMemory.cs | 34 ---- .../IStorageConnection.InMemory.cs | 89 ----------- .../IStorageInitializer.InMemory.cs | 27 ++++ .../IStorageTransaction.InMemory.cs | 58 ------- .../MemoryMessage.cs | 17 ++ 12 files changed, 244 insertions(+), 298 deletions(-) delete mode 100644 src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs delete mode 100644 src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs create mode 100644 src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs delete mode 100644 src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs delete mode 100644 src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs create mode 100644 src/DotNetCore.CAP.InMemoryStorage/IStorageInitializer.InMemory.cs delete mode 100644 src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs create mode 100644 src/DotNetCore.CAP.InMemoryStorage/MemoryMessage.cs diff --git a/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs index df03218..a1ede44 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs @@ -2,7 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using DotNetCore.CAP.InMemoryStorage; -using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Persistence; using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace @@ -13,14 +13,10 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - - services.AddSingleton(); - services.AddSingleton(x => (InMemoryPublisher)x.GetService()); - services.AddSingleton(); services.AddTransient(); + services.AddSingleton(); + services.AddSingleton(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj b/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj index 9375b62..f450ce2 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj +++ b/src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj @@ -11,10 +11,6 @@ 1701;1702;1705;CS1591 - - - - diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs deleted file mode 100644 index 6cb759c..0000000 --- a/src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.DependencyInjection; - -namespace DotNetCore.CAP.InMemoryStorage -{ - public class InMemoryPublisher : CapPublisherBase, ICallbackPublisher - { - public InMemoryPublisher(IServiceProvider provider) : base(provider) - { - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - await PublishAsyncInternal(message); - } - - protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, - CancellationToken cancel = default(CancellationToken)) - { - var connection = (InMemoryStorageConnection)ServiceProvider.GetService(); - - connection.PublishedMessages.Add(message); - - return Task.CompletedTask; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.InMemory.cs index 26c84b6..e4813be 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.InMemory.cs @@ -2,9 +2,13 @@ // Licensed under the MIT License. See License.txt in the project root for license information. // ReSharper disable once CheckNamespace -namespace DotNetCore.CAP + +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.InMemoryStorage { - public class InMemoryCapTransaction : CapTransactionBase + internal class InMemoryCapTransaction : CapTransactionBase { public InMemoryCapTransaction(IDispatcher dispatcher) : base(dispatcher) { @@ -15,11 +19,21 @@ namespace DotNetCore.CAP Flush(); } + public override Task CommitAsync(CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + public override void Rollback() { //Ignore } + public override Task RollbackAsync(CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + public override void Dispose() { } diff --git a/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs deleted file mode 100644 index 16c0267..0000000 --- a/src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using DotNetCore.CAP.Processor; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.DependencyInjection; - -namespace DotNetCore.CAP.InMemoryStorage -{ - internal class InMemoryCollectProcessor : ICollectProcessor - { - private readonly ILogger _logger; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - - public InMemoryCollectProcessor(ILogger logger) - { - _logger = logger; - } - - public async Task ProcessAsync(ProcessingContext context) - { - _logger.LogDebug($"Collecting expired data from memory list."); - - var connection = (InMemoryStorageConnection)context.Provider.GetService(); - - connection.PublishedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now); - connection.ReceivedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now); - - await context.WaitAsync(_waitingInterval); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs new file mode 100644 index 0000000..0ced410 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -0,0 +1,148 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Serialization; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.InMemoryStorage +{ + internal class InMemoryStorage : IDataStorage + { + private readonly IOptions _capOptions; + + public InMemoryStorage(IOptions capOptions) + { + _capOptions = capOptions; + } + + public static IList PublishedMessages { get; } = new List(); + + public static IList ReceivedMessages { get; } = new List(); + + public Task ChangePublishStateAsync(MediumMessage message, StatusName state) + { + PublishedMessages.First(x => x.DbId == message.DbId).StatusName = state; + return Task.CompletedTask; + } + + public Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) + { + ReceivedMessages.First(x => x.DbId == message.DbId).StatusName = state; + return Task.CompletedTask; + } + + public Task StoreMessageAsync(string name, Message content, object dbTransaction = null, + CancellationToken cancellationToken = default) + { + var message = new MediumMessage + { + DbId = content.GetId(), + Origin = content, + Content = StringSerializer.Serialize(content), + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + PublishedMessages.Add(new MemoryMessage() + { + DbId = message.DbId, + Name = name, + Content = message.Content, + Retries = message.Retries, + Added = message.Added, + ExpiresAt = message.ExpiresAt, + StatusName = StatusName.Scheduled + }); + + return Task.FromResult(message); + } + + public Task StoreReceivedExceptionMessageAsync(string name, string group, string content) + { + ReceivedMessages.Add(new MemoryMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Group = group, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = StatusName.Failed + }); + + return Task.CompletedTask; + } + + public Task StoreReceivedMessageAsync(string name, string @group, Message message) + { + var mdMessage = new MediumMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Origin = message, + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + ReceivedMessages.Add(new MemoryMessage + { + DbId = mdMessage.DbId, + Group = group, + Name = name, + Content = StringSerializer.Serialize(mdMessage.Origin), + Retries = mdMessage.Retries, + Added = mdMessage.Added, + ExpiresAt = mdMessage.ExpiresAt, + StatusName = StatusName.Failed + }); + + return Task.FromResult(mdMessage); + } + + public Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) + { + var ret = table == nameof(PublishedMessages) + ? ((List)PublishedMessages).RemoveAll(x => x.ExpiresAt < timeout) + : ((List)ReceivedMessages).RemoveAll(x => x.ExpiresAt < timeout); + return Task.FromResult(ret); + } + + public Task> GetPublishedMessagesOfNeedRetry() + { + var ret = PublishedMessages + .Where(x => x.Retries < _capOptions.Value.FailedRetryCount + && x.Added < DateTime.Now.AddSeconds(-10) + && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) + .Take(200) + .Select(x => (MediumMessage)x); + return Task.FromResult(ret); + } + + public Task> GetReceivedMessagesOfNeedRetry() + { + var ret = ReceivedMessages + .Where(x => x.Retries < _capOptions.Value.FailedRetryCount + && x.Added < DateTime.Now.AddSeconds(-10) + && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) + .Take(200) + .Select(x => (MediumMessage)x); + return Task.FromResult(ret); + } + + public IMonitoringApi GetMonitoringApi() + { + return new InMemoryMonitoringApi(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs index 7454435..1db101b 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs @@ -3,56 +3,59 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Linq; -using DotNetCore.CAP.Dashboard; -using DotNetCore.CAP.Dashboard.Monitoring; -using DotNetCore.CAP.Infrastructure; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; namespace DotNetCore.CAP.InMemoryStorage { internal class InMemoryMonitoringApi : IMonitoringApi { - private readonly IStorage _storage; + public Task GetPublishedMessageAsync(long id) + { + return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); + } - public InMemoryMonitoringApi(IStorage storage) + public Task GetReceivedMessageAsync(long id) { - _storage = storage; + return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); } public StatisticsDto GetStatistics() { - var connection = GetConnection(); var stats = new StatisticsDto { - PublishedSucceeded = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded), - ReceivedSucceeded = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded), - PublishedFailed = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Failed), - ReceivedFailed = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed) + PublishedSucceeded = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded), + ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded), + PublishedFailed = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed), + ReceivedFailed = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed) }; return stats; } public IDictionary HourlyFailedJobs(MessageType type) { - return GetHourlyTimelineStats(type, StatusName.Failed); + return GetHourlyTimelineStats(type, nameof(StatusName.Failed)); } public IDictionary HourlySucceededJobs(MessageType type) { - return GetHourlyTimelineStats(type, StatusName.Succeeded); + return GetHourlyTimelineStats(type, nameof(StatusName.Succeeded)); } public IList Messages(MessageQueryDto queryDto) { - var connection = GetConnection(); if (queryDto.MessageType == MessageType.Publish) { - var expression = connection.PublishedMessages.Where(x => true); + var expression = InMemoryStorage.PublishedMessages.Where(x => true); if (!string.IsNullOrEmpty(queryDto.StatusName)) { - expression = expression.Where(x => x.StatusName.ToLower() == queryDto.StatusName); + expression = expression.Where(x => x.StatusName.ToString() == queryDto.StatusName); } if (!string.IsNullOrEmpty(queryDto.Name)) @@ -73,19 +76,19 @@ namespace DotNetCore.CAP.InMemoryStorage Added = x.Added, Content = x.Content, ExpiresAt = x.ExpiresAt, - Id = x.Id, + Id = long.Parse(x.DbId), Name = x.Name, Retries = x.Retries, - StatusName = x.StatusName + StatusName = x.StatusName.ToString() }).ToList(); } else { - var expression = connection.ReceivedMessages.Where(x => true); + var expression = InMemoryStorage.ReceivedMessages.Where(x => true); if (!string.IsNullOrEmpty(queryDto.StatusName)) { - expression = expression.Where(x => x.StatusName.ToLower() == queryDto.StatusName); + expression = expression.Where(x => x.StatusName.ToString() == queryDto.StatusName); } if (!string.IsNullOrEmpty(queryDto.Name)) @@ -113,37 +116,32 @@ namespace DotNetCore.CAP.InMemoryStorage Version = "N/A", Content = x.Content, ExpiresAt = x.ExpiresAt, - Id = x.Id, + Id = long.Parse(x.DbId), Name = x.Name, Retries = x.Retries, - StatusName = x.StatusName + StatusName = x.StatusName.ToString() }).ToList(); } } public int PublishedFailedCount() { - return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Failed); + return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed); } public int PublishedSucceededCount() { - return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded); + return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded); } public int ReceivedFailedCount() { - return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Failed); + return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed); } public int ReceivedSucceededCount() { - return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded); - } - - private InMemoryStorageConnection GetConnection() - { - return (InMemoryStorageConnection)_storage.GetConnection(); + return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded); } private Dictionary GetHourlyTimelineStats(MessageType type, string statusName) @@ -158,20 +156,19 @@ namespace DotNetCore.CAP.InMemoryStorage var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x); - var connection = GetConnection(); Dictionary valuesMap; if (type == MessageType.Publish) { - valuesMap = connection.PublishedMessages - .Where(x => x.StatusName == statusName) + valuesMap = InMemoryStorage.PublishedMessages + .Where(x => x.StatusName.ToString() == statusName) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .ToDictionary(x => x.Key, x => x.Count()); } else { - valuesMap = connection.ReceivedMessages - .Where(x => x.StatusName == statusName) + valuesMap = InMemoryStorage.ReceivedMessages + .Where(x => x.StatusName.ToString() == statusName) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .ToDictionary(x => x.Key, x => x.Count()); } diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs deleted file mode 100644 index 55cf139..0000000 --- a/src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System.Threading; -using System.Threading.Tasks; -using DotNetCore.CAP.Dashboard; - -namespace DotNetCore.CAP.InMemoryStorage -{ - public class InMemoryStorage : IStorage - { - private readonly IStorageConnection _connection; - - public InMemoryStorage(IStorageConnection connection) - { - _connection = connection; - } - - public IStorageConnection GetConnection() - { - return _connection; - } - - public IMonitoringApi GetMonitoringApi() - { - return new InMemoryMonitoringApi(this); - } - - public Task InitializeAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs deleted file mode 100644 index 22364b0..0000000 --- a/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Async; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.InMemoryStorage -{ - public class InMemoryStorageConnection : IStorageConnection - { - private readonly CapOptions _capOptions; - - public InMemoryStorageConnection(IOptions capOptions) - { - _capOptions = capOptions.Value; - - PublishedMessages = new List(); - ReceivedMessages = new List(); - } - - internal List PublishedMessages { get; } - - internal List ReceivedMessages { get; } - - public IStorageTransaction CreateTransaction() - { - return new InMemoryStorageTransaction(this); - } - - public Task GetPublishedMessageAsync(long id) - { - return PublishedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id); - } - - public async Task> GetPublishedMessagesOfNeedRetry() - { - return await PublishedMessages.ToAsyncEnumerable() - .Where(x => x.Retries < _capOptions.FailedRetryCount - && x.Added < DateTime.Now.AddSeconds(-10) - && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) - .Take(200) - .ToListAsync(); - } - - public void StoreReceivedMessage(CapReceivedMessage message) - { - ReceivedMessages.Add(message); - } - - public Task GetReceivedMessageAsync(long id) - { - return ReceivedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id); - } - - public async Task> GetReceivedMessagesOfNeedRetry() - { - return await ReceivedMessages.ToAsyncEnumerable() - .Where(x => x.Retries < _capOptions.FailedRetryCount - && x.Added < DateTime.Now.AddSeconds(-10) - && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) - .Take(200) - .ToListAsync(); - } - - public bool ChangePublishedState(long messageId, string state) - { - var msg = PublishedMessages.First(x => x.Id == messageId); - msg.Retries++; - msg.ExpiresAt = null; - msg.StatusName = state; - return true; - } - - public bool ChangeReceivedState(long messageId, string state) - { - var msg = ReceivedMessages.First(x => x.Id == messageId); - msg.Retries++; - msg.ExpiresAt = null; - msg.StatusName = state; - return true; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageInitializer.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageInitializer.InMemory.cs new file mode 100644 index 0000000..544bcf7 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/IStorageInitializer.InMemory.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Persistence; + +namespace DotNetCore.CAP.InMemoryStorage +{ + internal class InMemoryStorageInitializer : IStorageInitializer + { + public string GetPublishedTableName() + { + return nameof(InMemoryStorage.PublishedMessages); + } + + public string GetReceivedTableName() + { + return nameof(InMemoryStorage.ReceivedMessages); + } + + public Task InitializeAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs deleted file mode 100644 index a63d8a5..0000000 --- a/src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Linq; -using System.Threading.Tasks; -using DotNetCore.CAP.Messages; - -namespace DotNetCore.CAP.InMemoryStorage -{ - public class InMemoryStorageTransaction : IStorageTransaction - { - private readonly InMemoryStorageConnection _connection; - - public InMemoryStorageTransaction(InMemoryStorageConnection connection) - { - _connection = connection; - } - - public void UpdateMessage(CapPublishedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var msg = _connection.PublishedMessages.FirstOrDefault(x => message.Id == x.Id); - if (msg == null) return; - msg.Retries = message.Retries; - msg.Content = message.Content; - msg.ExpiresAt = message.ExpiresAt; - msg.StatusName = message.StatusName; - } - - public void UpdateMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - var msg = _connection.ReceivedMessages.FirstOrDefault(x => message.Id == x.Id); - if (msg == null) return; - msg.Retries = message.Retries; - msg.Content = message.Content; - msg.ExpiresAt = message.ExpiresAt; - msg.StatusName = message.StatusName; - } - - public Task CommitAsync() - { - return Task.CompletedTask; - } - - public void Dispose() - { - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/MemoryMessage.cs b/src/DotNetCore.CAP.InMemoryStorage/MemoryMessage.cs new file mode 100644 index 0000000..53bb070 --- /dev/null +++ b/src/DotNetCore.CAP.InMemoryStorage/MemoryMessage.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Persistence; + +namespace DotNetCore.CAP.InMemoryStorage +{ + internal class MemoryMessage : MediumMessage + { + public string Name { get; set; } + + public StatusName StatusName { get; set; } + + public string Group { get; set; } + } +}