From 2e1cb03519a96fcf32d57ea774682b2613d1b6c2 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 13 Dec 2019 18:32:35 +0800 Subject: [PATCH] Fix inmemeory storage bug --- .../IDataStorage.InMemory.cs | 72 ++++++++++++++----- .../IMonitoringApi.InMemory.cs | 28 ++++---- .../Serialization/ISerializer.JsonUtf8.cs | 5 ++ 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 3047938..7560633 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -24,19 +25,19 @@ namespace DotNetCore.CAP.InMemoryStorage _capOptions = capOptions; } - public static IList PublishedMessages { get; } = new List(); + public static ConcurrentDictionary PublishedMessages { get; } = new ConcurrentDictionary(); - public static IList ReceivedMessages { get; } = new List(); + public static ConcurrentDictionary ReceivedMessages { get; } = new ConcurrentDictionary(); public Task ChangePublishStateAsync(MediumMessage message, StatusName state) { - PublishedMessages.First(x => x.DbId == message.DbId).StatusName = state; + PublishedMessages[message.DbId].StatusName = state; return Task.CompletedTask; } public Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) { - ReceivedMessages.First(x => x.DbId == message.DbId).StatusName = state; + ReceivedMessages[message.DbId].StatusName = state; return Task.CompletedTask; } @@ -52,7 +53,7 @@ namespace DotNetCore.CAP.InMemoryStorage Retries = 0 }; - PublishedMessages.Add(new MemoryMessage() + PublishedMessages[message.DbId] = new MemoryMessage() { DbId = message.DbId, Name = name, @@ -61,24 +62,27 @@ namespace DotNetCore.CAP.InMemoryStorage Added = message.Added, ExpiresAt = message.ExpiresAt, StatusName = StatusName.Scheduled - }); + }; return message; } public void StoreReceivedExceptionMessage(string name, string group, string content) { - ReceivedMessages.Add(new MemoryMessage + var id = SnowflakeId.Default().NextId().ToString(); + + ReceivedMessages[id] = new MemoryMessage { - DbId = SnowflakeId.Default().NextId().ToString(), + DbId = id, Group = group, + Origin = null, Name = name, Content = content, Retries = _capOptions.Value.FailedRetryCount, Added = DateTime.Now, ExpiresAt = DateTime.Now.AddDays(15), StatusName = StatusName.Failed - }); + }; } public MediumMessage StoreReceivedMessage(string name, string @group, Message message) @@ -92,9 +96,10 @@ namespace DotNetCore.CAP.InMemoryStorage Retries = 0 }; - ReceivedMessages.Add(new MemoryMessage + ReceivedMessages[mdMessage.DbId] = new MemoryMessage { DbId = mdMessage.DbId, + Origin = mdMessage.Origin, Group = group, Name = name, Content = StringSerializer.Serialize(mdMessage.Origin), @@ -102,38 +107,69 @@ namespace DotNetCore.CAP.InMemoryStorage Added = mdMessage.Added, ExpiresAt = mdMessage.ExpiresAt, StatusName = StatusName.Failed - }); - + }; return mdMessage; } public Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { - var ret = table == nameof(PublishedMessages) - ? ((List)PublishedMessages).RemoveAll(x => x.ExpiresAt < timeout) - : ((List)ReceivedMessages).RemoveAll(x => x.ExpiresAt < timeout); - return Task.FromResult(ret); + var removed = 0; + if (table == nameof(PublishedMessages)) + { + var ids = PublishedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList(); + foreach (var id in ids) + { + if (PublishedMessages.TryRemove(id, out _)) + { + removed++; + } + } + } + else + { + var ids = ReceivedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList(); + foreach (var id in ids) + { + if (PublishedMessages.TryRemove(id, out _)) + { + removed++; + } + } + } + return Task.FromResult(removed); } public Task> GetPublishedMessagesOfNeedRetry() { - var ret = PublishedMessages + var ret = PublishedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount && x.Added < DateTime.Now.AddSeconds(-10) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x); + + foreach (var message in ret) + { + message.Origin = StringSerializer.DeSerialize(message.Content); + } + return Task.FromResult(ret); } public Task> GetReceivedMessagesOfNeedRetry() { - var ret = ReceivedMessages + var ret = ReceivedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount && x.Added < DateTime.Now.AddSeconds(-10) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x); + + foreach (var message in ret) + { + message.Origin = StringSerializer.DeSerialize(message.Content); + } + return Task.FromResult(ret); } diff --git a/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs index 55b5773..7ff722f 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs @@ -17,22 +17,22 @@ namespace DotNetCore.CAP.InMemoryStorage { public Task GetPublishedMessageAsync(long id) { - return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); + return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.Values.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); } public Task GetReceivedMessageAsync(long id) { - return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); + return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.Values.First(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); } public StatisticsDto GetStatistics() { var stats = new StatisticsDto { - PublishedSucceeded = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded), - ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded), - PublishedFailed = InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed), - ReceivedFailed = InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed) + PublishedSucceeded = InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded), + ReceivedSucceeded = InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded), + PublishedFailed = InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Failed), + ReceivedFailed = InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Failed) }; return stats; } @@ -51,7 +51,7 @@ namespace DotNetCore.CAP.InMemoryStorage { if (queryDto.MessageType == MessageType.Publish) { - var expression = InMemoryStorage.PublishedMessages.Where(x => true); + var expression = InMemoryStorage.PublishedMessages.Values.Where(x => true); if (!string.IsNullOrEmpty(queryDto.StatusName)) { @@ -85,7 +85,7 @@ namespace DotNetCore.CAP.InMemoryStorage } else { - var expression = InMemoryStorage.ReceivedMessages.Where(x => true); + var expression = InMemoryStorage.ReceivedMessages.Values.Where(x => true); if (!string.IsNullOrEmpty(queryDto.StatusName)) { @@ -127,22 +127,22 @@ namespace DotNetCore.CAP.InMemoryStorage public int PublishedFailedCount() { - return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Failed); + return InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Failed); } public int PublishedSucceededCount() { - return InMemoryStorage.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded); + return InMemoryStorage.PublishedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded); } public int ReceivedFailedCount() { - return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed); + return InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Failed); } public int ReceivedSucceededCount() { - return InMemoryStorage.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded); + return InMemoryStorage.ReceivedMessages.Values.Count(x => x.StatusName == StatusName.Succeeded); } private Dictionary GetHourlyTimelineStats(MessageType type, string statusName) @@ -161,14 +161,14 @@ namespace DotNetCore.CAP.InMemoryStorage Dictionary valuesMap; if (type == MessageType.Publish) { - valuesMap = InMemoryStorage.PublishedMessages + valuesMap = InMemoryStorage.PublishedMessages.Values .Where(x => x.StatusName.ToString() == statusName) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .ToDictionary(x => x.Key, x => x.Count()); } else { - valuesMap = InMemoryStorage.ReceivedMessages + valuesMap = InMemoryStorage.ReceivedMessages.Values .Where(x => x.StatusName.ToString() == statusName) .GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH")) .ToDictionary(x => x.Key, x => x.Count()); diff --git a/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs index b3fdbd9..a2e534a 100644 --- a/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs +++ b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs @@ -13,6 +13,11 @@ namespace DotNetCore.CAP.Serialization { public Task SerializeAsync(Message message) { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + if (message.Value == null) { return Task.FromResult(new TransportMessage(message.Headers, null));