Browse Source

Fixes inmemory storage IndexOutOfRangeException. (#977)

master
Savorboard 3 years ago
parent
commit
46d1b7d4e3
1 changed files with 88 additions and 61 deletions
  1. +88
    -61
      src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs

+ 88
- 61
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs View File

@@ -58,16 +58,19 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0
};

PublishedMessages[message.DbId] = new MemoryMessage()
lock (PublishedMessages)
{
DbId = message.DbId,
Name = name,
Content = message.Content,
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};
PublishedMessages[message.DbId] = new MemoryMessage
{
DbId = message.DbId,
Name = name,
Content = message.Content,
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};
}

return message;
}
@@ -76,18 +79,21 @@ namespace DotNetCore.CAP.InMemoryStorage
{
var id = SnowflakeId.Default().NextId().ToString();

ReceivedMessages[id] = new MemoryMessage
lock (ReceivedMessages)
{
DbId = id,
Group = group,
Origin = null,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
};
ReceivedMessages[id] = new MemoryMessage
{
DbId = id,
Group = group,
Origin = null,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = StatusName.Failed
};
}
}

public MediumMessage StoreReceivedMessage(string name, string @group, Message message)
@@ -101,90 +107,111 @@ namespace DotNetCore.CAP.InMemoryStorage
Retries = 0
};

ReceivedMessages[mdMessage.DbId] = new MemoryMessage
lock (ReceivedMessages)
{
DbId = mdMessage.DbId,
Origin = mdMessage.Origin,
Group = group,
Name = name,
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Scheduled
};
ReceivedMessages[mdMessage.DbId] = new MemoryMessage
{
DbId = mdMessage.DbId,
Origin = mdMessage.Origin,
Group = group,
Name = name,
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
StatusName = StatusName.Scheduled
};
}
return mdMessage;
}

public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{

var removed = 0;
if (table == nameof(PublishedMessages))
{
var ids = PublishedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);
foreach (var id in ids)
lock (PublishedMessages)
{
if (PublishedMessages.Remove(id))
var ids = PublishedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);

foreach (var id in ids)
{
removed++;
if (PublishedMessages.Remove(id))
{
removed++;
}
}
}

}
else
{
var ids = ReceivedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);

foreach (var id in ids)
lock (ReceivedMessages)
{
if (ReceivedMessages.Remove(id))
var ids = ReceivedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);

foreach (var id in ids)
{
removed++;
if (ReceivedMessages.Remove(id))
{
removed++;
}
}
}
}
}

return Task.FromResult(removed);
}

public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var ret = PublishedMessages.Values
IEnumerable<MediumMessage> result;

lock (PublishedMessages)
{
result = PublishedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x);
.Select(x => (MediumMessage)x).ToList();
}

foreach (var message in ret)
foreach (var message in result)
{
message.Origin = _serializer.Deserialize(message.Content);
}

return Task.FromResult(ret);
return Task.FromResult(result);
}

public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var ret = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x);

foreach (var message in ret)
IEnumerable<MediumMessage> result;

lock (ReceivedMessages)
{
result = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
}

foreach (var message in result)
{
message.Origin = _serializer.Deserialize(message.Content);
}

return Task.FromResult(ret);
return Task.FromResult(result);
}

public IMonitoringApi GetMonitoringApi()


Loading…
Cancel
Save