diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 975bc38..0e7f829 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -26,9 +27,9 @@ namespace DotNetCore.CAP.InMemoryStorage _serializer = serializer; } - public static Dictionary PublishedMessages { get; } = new(); + public static ConcurrentDictionary PublishedMessages { get; } = new(); - public static Dictionary ReceivedMessages { get; } = new(); + public static ConcurrentDictionary ReceivedMessages { get; } = new(); public Task ChangePublishStateAsync(MediumMessage message, StatusName state) { @@ -58,19 +59,16 @@ namespace DotNetCore.CAP.InMemoryStorage Retries = 0 }; - lock (PublishedMessages) + PublishedMessages[message.DbId] = new MemoryMessage { - PublishedMessages[message.DbId] = new MemoryMessage - { - DbId = message.DbId, - Name = name, - Content = message.Content, - Retries = message.Retries, - Added = message.Added, - ExpiresAt = message.ExpiresAt, - StatusName = StatusName.Scheduled - }; - } + DbId = message.DbId, + Name = name, + Content = message.Content, + Retries = message.Retries, + Added = message.Added, + ExpiresAt = message.ExpiresAt, + StatusName = StatusName.Scheduled + }; return message; } @@ -79,21 +77,18 @@ namespace DotNetCore.CAP.InMemoryStorage { var id = SnowflakeId.Default().NextId().ToString(); - lock (ReceivedMessages) + ReceivedMessages[id] = new MemoryMessage { - ReceivedMessages[id] = new MemoryMessage - { - DbId = id, - Group = group, - Origin = null!, - Name = name, - Content = content, - Retries = _capOptions.Value.FailedRetryCount, - Added = DateTime.Now, - ExpiresAt = DateTime.Now.AddDays(15), - StatusName = StatusName.Failed - }; - } + DbId = id, + Group = group, + Origin = null!, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = StatusName.Failed + }; } public MediumMessage StoreReceivedMessage(string name, string @group, Message message) @@ -107,21 +102,19 @@ namespace DotNetCore.CAP.InMemoryStorage Retries = 0 }; - lock (ReceivedMessages) + ReceivedMessages[mdMessage.DbId] = new MemoryMessage { - ReceivedMessages[mdMessage.DbId] = new MemoryMessage - { - DbId = mdMessage.DbId, - Origin = mdMessage.Origin, - Group = group, - Name = name, - Content = _serializer.Serialize(mdMessage.Origin), - Retries = mdMessage.Retries, - Added = mdMessage.Added, - ExpiresAt = mdMessage.ExpiresAt, - StatusName = StatusName.Scheduled - }; - } + DbId = mdMessage.DbId, + Origin = mdMessage.Origin, + Group = group, + Name = name, + Content = _serializer.Serialize(mdMessage.Origin), + Retries = mdMessage.Retries, + Added = mdMessage.Added, + ExpiresAt = mdMessage.ExpiresAt, + StatusName = StatusName.Scheduled + }; + return mdMessage; } @@ -131,38 +124,31 @@ namespace DotNetCore.CAP.InMemoryStorage var removed = 0; if (table == nameof(PublishedMessages)) { - lock (PublishedMessages) - { - var ids = PublishedMessages.Values - .Where(x => x.ExpiresAt < timeout) - .Select(x => x.DbId) - .Take(batchCount); + var ids = PublishedMessages.Values + .Where(x => x.ExpiresAt < timeout) + .Select(x => x.DbId) + .Take(batchCount); - foreach (var id in ids) + foreach (var id in ids) + { + if (PublishedMessages.TryRemove(id, out _)) { - if (PublishedMessages.Remove(id)) - { - removed++; - } + removed++; } } - } else { - lock (ReceivedMessages) - { - var ids = ReceivedMessages.Values - .Where(x => x.ExpiresAt < timeout) - .Select(x => x.DbId) - .Take(batchCount); + var ids = ReceivedMessages.Values + .Where(x => x.ExpiresAt < timeout) + .Select(x => x.DbId) + .Take(batchCount); - foreach (var id in ids) + foreach (var id in ids) + { + if (ReceivedMessages.TryRemove(id, out _)) { - if (ReceivedMessages.Remove(id)) - { - removed++; - } + removed++; } } } @@ -172,17 +158,12 @@ namespace DotNetCore.CAP.InMemoryStorage public Task> GetPublishedMessagesOfNeedRetry() { - IEnumerable result; - - lock (PublishedMessages) - { - result = PublishedMessages.Values + IEnumerable result = PublishedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount && x.Added < DateTime.Now.AddSeconds(-10) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x).ToList(); - } foreach (var message in result) { @@ -194,17 +175,12 @@ namespace DotNetCore.CAP.InMemoryStorage public Task> GetReceivedMessagesOfNeedRetry() { - IEnumerable result; - - lock (ReceivedMessages) - { - result = ReceivedMessages.Values - .Where(x => x.Retries < _capOptions.Value.FailedRetryCount - && x.Added < DateTime.Now.AddSeconds(-10) - && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) - .Take(200) - .Select(x => (MediumMessage)x).ToList(); - } + IEnumerable result = ReceivedMessages.Values + .Where(x => x.Retries < _capOptions.Value.FailedRetryCount + && x.Added < DateTime.Now.AddSeconds(-10) + && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) + .Take(200) + .Select(x => (MediumMessage)x).ToList(); foreach (var message in result) {