Browse Source

used concurrentdictionary since PublishedMessages and ReceivedMessages are public and accessed from various places (#1104)

if one thread enumerates through dictionary and the other tries to update it, we get a NullReferenceException

Co-authored-by: Rafal Kozlowski <rafal.kozlowski@external.iherb.com>
master
Rafal Kozlowski 2 years ago
committed by GitHub
parent
commit
f9bd32d023
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 58 additions and 82 deletions
  1. +58
    -82
      src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs

+ 58
- 82
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs View File

@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
@@ -26,9 +27,9 @@ namespace DotNetCore.CAP.InMemoryStorage
_serializer = serializer; _serializer = serializer;
} }


public static Dictionary<string, MemoryMessage> PublishedMessages { get; } = new();
public static ConcurrentDictionary<string, MemoryMessage> PublishedMessages { get; } = new();


public static Dictionary<string, MemoryMessage> ReceivedMessages { get; } = new();
public static ConcurrentDictionary<string, MemoryMessage> ReceivedMessages { get; } = new();


public Task ChangePublishStateAsync(MediumMessage message, StatusName state) public Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{ {
@@ -58,19 +59,16 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0 Retries = 0
}; };


lock (PublishedMessages)
PublishedMessages[message.DbId] = new MemoryMessage
{ {
PublishedMessages[message.DbId] = new MemoryMessage
{
DbId = message.DbId,
Name = name,
Content = message.Content,
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};
}
DbId = message.DbId,
Name = name,
Content = message.Content,
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};


return message; return message;
} }
@@ -79,21 +77,18 @@ namespace DotNetCore.CAP.InMemoryStorage
{ {
var id = SnowflakeId.Default().NextId().ToString(); var id = SnowflakeId.Default().NextId().ToString();


lock (ReceivedMessages)
ReceivedMessages[id] = new MemoryMessage
{ {
ReceivedMessages[id] = new MemoryMessage
{
DbId = id,
Group = group,
Origin = null!,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
};
}
DbId = id,
Group = group,
Origin = null!,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
};
} }


public MediumMessage StoreReceivedMessage(string name, string @group, Message message) public MediumMessage StoreReceivedMessage(string name, string @group, Message message)
@@ -107,21 +102,19 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0 Retries = 0
}; };


lock (ReceivedMessages)
ReceivedMessages[mdMessage.DbId] = new MemoryMessage
{ {
ReceivedMessages[mdMessage.DbId] = new MemoryMessage
{
DbId = mdMessage.DbId,
Origin = mdMessage.Origin,
Group = group,
Name = name,
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Scheduled
};
}
DbId = mdMessage.DbId,
Origin = mdMessage.Origin,
Group = group,
Name = name,
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Scheduled
};

return mdMessage; return mdMessage;
} }


@@ -131,38 +124,31 @@ namespace DotNetCore.CAP.InMemoryStorage
var removed = 0; var removed = 0;
if (table == nameof(PublishedMessages)) if (table == nameof(PublishedMessages))
{ {
lock (PublishedMessages)
{
var ids = PublishedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);
var ids = PublishedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);


foreach (var id in ids)
foreach (var id in ids)
{
if (PublishedMessages.TryRemove(id, out _))
{ {
if (PublishedMessages.Remove(id))
{
removed++;
}
removed++;
} }
} }

} }
else else
{ {
lock (ReceivedMessages)
{
var ids = ReceivedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);
var ids = ReceivedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);


foreach (var id in ids)
foreach (var id in ids)
{
if (ReceivedMessages.TryRemove(id, out _))
{ {
if (ReceivedMessages.Remove(id))
{
removed++;
}
removed++;
} }
} }
} }
@@ -172,17 +158,12 @@ namespace DotNetCore.CAP.InMemoryStorage


public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{ {
IEnumerable<MediumMessage> result;

lock (PublishedMessages)
{
result = PublishedMessages.Values
IEnumerable<MediumMessage> result = PublishedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount .Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10) && x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200) .Take(200)
.Select(x => (MediumMessage)x).ToList(); .Select(x => (MediumMessage)x).ToList();
}


foreach (var message in result) foreach (var message in result)
{ {
@@ -194,17 +175,12 @@ namespace DotNetCore.CAP.InMemoryStorage


public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{ {
IEnumerable<MediumMessage> result;

lock (ReceivedMessages)
{
result = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
}
IEnumerable<MediumMessage> result = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();


foreach (var message in result) foreach (var message in result)
{ {


Loading…
Cancel
Save