Browse Source

Add In-Memory storage support. (#296)

master
Savorboard 5 years ago
parent
commit
daee2db80a
15 changed files with 550 additions and 45 deletions
  1. +7
    -0
      CAP.sln
  2. +3
    -41
      samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs
  3. +1
    -1
      samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj
  4. +2
    -2
      samples/Sample.AzureServiceBus.MySql/Startup.cs
  5. +26
    -0
      src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs
  6. +17
    -0
      src/DotNetCore.CAP.InMemoryStorage/CAP.Options.Extensions.cs
  7. +22
    -0
      src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj
  8. +34
    -0
      src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs
  9. +27
    -0
      src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.MySql.cs
  10. +34
    -0
      src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs
  11. +196
    -0
      src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs
  12. +34
    -0
      src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs
  13. +88
    -0
      src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs
  14. +58
    -0
      src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs
  15. +1
    -1
      src/DotNetCore.CAP/ICallbackPublisher.cs

+ 7
- 0
CAP.sln View File

@@ -66,6 +66,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureService
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.MySql", "samples\Sample.AzureServiceBus.MySql\Sample.AzureServiceBus.MySql.csproj", "{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.MySql", "samples\Sample.AzureServiceBus.MySql\Sample.AzureServiceBus.MySql.csproj", "{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.InMemoryStorage", "src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj", "{58B6E829-C6C8-457C-9DD0-C600650254DF}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@@ -136,6 +138,10 @@ Global
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.Build.0 = Debug|Any CPU {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.ActiveCfg = Release|Any CPU {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.Build.0 = Release|Any CPU {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.Build.0 = Release|Any CPU
{58B6E829-C6C8-457C-9DD0-C600650254DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{58B6E829-C6C8-457C-9DD0-C600650254DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{58B6E829-C6C8-457C-9DD0-C600650254DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{58B6E829-C6C8-457C-9DD0-C600650254DF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@@ -158,6 +164,7 @@ Global
{11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} {11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B} = {3A6B6931-A123-477A-9469-8B468B5385AF} {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{58B6E829-C6C8-457C-9DD0-C600650254DF} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 3
- 41
samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs View File

@@ -1,10 +1,7 @@
using System; using System;
using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP; using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;


namespace Sample.AzureServiceBus.MySql.Controllers namespace Sample.AzureServiceBus.MySql.Controllers
{ {
@@ -21,50 +18,15 @@ namespace Sample.AzureServiceBus.MySql.Controllers
[Route("~/without/transaction")] [Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction() public async Task<IActionResult> WithoutTransaction()
{ {
await _capBus.PublishAsync("sample.azure.mysql", DateTime.Now);
await _capBus.PublishAsync("sample.azure.mysql2", DateTime.Now);


return Ok(); return Ok();
} }


[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(""))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
{
//your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);

for (int i = 0; i < 5; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}

transaction.Commit();
}
}

return Ok();
}


[CapSubscribe("sample.azure.mysql")]
public void Test2(DateTime value)
{
Console.WriteLine("Subscriber output message: " + value);
}

[CapSubscribe("sample.azure.mysql")]
[CapSubscribe("sample.azure.mysql2")]
public void Test2T2(DateTime value) public void Test2T2(DateTime value)
{ {
Console.WriteLine("Test2T2-->Subscriber output message: " + value);
}

[CapSubscribe("sample.azure.mysql",Group = "groupd")]
public void Test2Group(DateTime value)
{
Console.WriteLine("Group--> Subscriber output message: " + value);
Console.WriteLine("Subscriber output message: " + value);
} }
} }
} }

+ 1
- 1
samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj View File

@@ -12,7 +12,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>




+ 2
- 2
samples/Sample.AzureServiceBus.MySql/Startup.cs View File

@@ -9,8 +9,8 @@ namespace Sample.AzureServiceBus.MySql
{ {
services.AddCap(x => services.AddCap(x =>
{ {
x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<yourkey>");
x.UseInMemoryStorage();
x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<your-key>");
x.UseDashboard(); x.UseDashboard();
}); });




+ 26
- 0
src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs View File

@@ -0,0 +1,26 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.InMemoryStorage;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal class InMemoryCapOptionsExtension : ICapOptionsExtension
{
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, InMemoryStorage.InMemoryStorage>();
services.AddSingleton<IStorageConnection, InMemoryStorageConnection>();

services.AddSingleton<ICapPublisher, InMemoryPublisher>();
services.AddSingleton<ICallbackPublisher, InMemoryPublisher>();

services.AddTransient<ICollectProcessor, InMemoryCollectProcessor>();
services.AddTransient<CapTransactionBase, InMemoryCapTransaction>();
}
}
}

+ 17
- 0
src/DotNetCore.CAP.InMemoryStorage/CAP.Options.Extensions.cs View File

@@ -0,0 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseInMemoryStorage(this CapOptions options)
{
options.RegisterExtension(new InMemoryCapOptionsExtension());
return options;
}
}
}

+ 22
- 0
src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj View File

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.InMemoryStorage</AssemblyName>
<PackageTags>$(PackageTags);InMemory</PackageTags>
</PropertyGroup>

<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MySql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AsyncEnumerator" Version="3.0.0-beta1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 34
- 0
src/DotNetCore.CAP.InMemoryStorage/ICapPublisher.InMemory.cs View File

@@ -0,0 +1,34 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryPublisher : CapPublisherBase, ICallbackPublisher
{
public InMemoryPublisher(IServiceProvider provider) : base(provider)
{
}

public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}

protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
var connection = (InMemoryStorageConnection)ServiceProvider.GetService<IStorageConnection>();

connection.PublishedMessages.Add(message);

return Task.CompletedTask;
}
}
}

+ 27
- 0
src/DotNetCore.CAP.InMemoryStorage/ICapTransaction.MySql.cs View File

@@ -0,0 +1,27 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class InMemoryCapTransaction : CapTransactionBase
{
public InMemoryCapTransaction(IDispatcher dispatcher) : base(dispatcher)
{
}

public override void Commit()
{
Flush();
}

public override void Rollback()
{
//Ignore
}

public override void Dispose()
{
}
}
}

+ 34
- 0
src/DotNetCore.CAP.InMemoryStorage/ICollectProcessor.InMemory.cs View File

@@ -0,0 +1,34 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.InMemoryStorage
{
internal class InMemoryCollectProcessor : ICollectProcessor
{
private readonly ILogger _logger;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);

public InMemoryCollectProcessor(ILogger<InMemoryCollectProcessor> logger)
{
_logger = logger;
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug($"Collecting expired data from memory list.");

var connection = (InMemoryStorageConnection)context.Provider.GetService<IStorageConnection>();

connection.PublishedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now);
connection.ReceivedMessages.RemoveAll(x => x.ExpiresAt < DateTime.Now);

await context.WaitAsync(_waitingInterval);
}
}
}

+ 196
- 0
src/DotNetCore.CAP.InMemoryStorage/IMonitoringApi.InMemory.cs View File

@@ -0,0 +1,196 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.InMemoryStorage
{
internal class InMemoryMonitoringApi : IMonitoringApi
{
private readonly InMemoryStorage _storage;

public InMemoryMonitoringApi(IStorage storage)
{
_storage = storage as InMemoryStorage ?? throw new ArgumentNullException(nameof(storage));
}

public StatisticsDto GetStatistics()
{
var connection = (InMemoryStorageConnection)_storage.GetConnection();
var stats = new StatisticsDto
{
PublishedSucceeded = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded),
ReceivedSucceeded = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded),
PublishedFailed = connection.PublishedMessages.Count(x => x.StatusName == StatusName.Failed),
ReceivedFailed = connection.ReceivedMessages.Count(x => x.StatusName == StatusName.Failed)
};
return stats;
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Failed);
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Succeeded);
}

public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var connection = GetConnection();
if (queryDto.MessageType == MessageType.Publish)
{
var expression = connection.PublishedMessages.Where(x => true);

if (!string.IsNullOrEmpty(queryDto.StatusName))
{
expression = expression.Where(x => x.StatusName == queryDto.StatusName);
}

if (!string.IsNullOrEmpty(queryDto.Name))
{
expression = expression.Where(x => x.Name == queryDto.Name);
}

if (!string.IsNullOrEmpty(queryDto.Content))
{
//TODO: StartsWith will replace with regex
expression = expression.Where(x => x.Content.StartsWith(queryDto.Content));
}

var offset = queryDto.CurrentPage * queryDto.PageSize;
var size = queryDto.PageSize;

return expression.Skip(offset).Take(size).Select(x => new MessageDto()
{
Added = x.Added,
Content = x.Content,
ExpiresAt = x.ExpiresAt,
Id = x.Id,
Name = x.Name,
Retries = x.Retries,
StatusName = x.StatusName
}).ToList();
}
else
{
var expression = connection.ReceivedMessages.Where(x => true);

if (!string.IsNullOrEmpty(queryDto.StatusName))
{
expression = expression.Where(x => x.StatusName == queryDto.StatusName);
}

if (!string.IsNullOrEmpty(queryDto.Name))
{
expression = expression.Where(x => x.Name == queryDto.Name);
}

if (!string.IsNullOrEmpty(queryDto.Group))
{
expression = expression.Where(x => x.Group == queryDto.Name);
}

if (!string.IsNullOrEmpty(queryDto.Content))
{
//TODO: StartsWith will replace with regex
expression = expression.Where(x => x.Content.StartsWith(queryDto.Content));
}

var offset = queryDto.CurrentPage * queryDto.PageSize;
var size = queryDto.PageSize;

return expression.Skip(offset).Take(size).Select(x => new MessageDto()
{
Added = x.Added,
Content = x.Content,
ExpiresAt = x.ExpiresAt,
Id = x.Id,
Name = x.Name,
Retries = x.Retries,
StatusName = x.StatusName
}).ToList();
}
}

public int PublishedFailedCount()
{
return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Failed);
}

public int PublishedSucceededCount()
{
return GetConnection().PublishedMessages.Count(x => x.StatusName == StatusName.Succeeded);
}

public int ReceivedFailedCount()
{
return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Failed);
}

public int ReceivedSucceededCount()
{
return GetConnection().ReceivedMessages.Count(x => x.StatusName == StatusName.Succeeded);
}

private InMemoryStorageConnection GetConnection()
{
return (InMemoryStorageConnection)_storage.GetConnection();
}

private Dictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
for (var i = 0; i < 24; i++)
{
dates.Add(endDate);
endDate = endDate.AddHours(-1);
}

var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);

var connection = GetConnection();

Dictionary<string, int> valuesMap;
if (type == MessageType.Publish)
{
valuesMap = connection.PublishedMessages
.Where(x => x.StatusName == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count());
}
else
{
valuesMap = connection.ReceivedMessages
.Where(x => x.StatusName == statusName)
.GroupBy(x => x.Added.ToString("yyyy-MM-dd-HH"))
.ToDictionary(x => x.Key, x => x.Count());
}

foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}

var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
{
var value = valuesMap[keyMaps.ElementAt(i).Key];
result.Add(keyMaps.ElementAt(i).Value, value);
}
return result;
}
}
}

+ 34
- 0
src/DotNetCore.CAP.InMemoryStorage/IStorage.InMemory.cs View File

@@ -0,0 +1,34 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;

namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorage : IStorage
{
private readonly IStorageConnection _connection;

public InMemoryStorage(IStorageConnection connection)
{
_connection = connection;
}

public IStorageConnection GetConnection()
{
return _connection;
}

public IMonitoringApi GetMonitoringApi()
{
return new InMemoryMonitoringApi(this);
}

public Task InitializeAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 88
- 0
src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs View File

@@ -0,0 +1,88 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Async;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;

public InMemoryStorageConnection(CapOptions capOptions)
{
_capOptions = capOptions;

PublishedMessages = new List<CapPublishedMessage>();
ReceivedMessages = new List<CapReceivedMessage>();
}

internal List<CapPublishedMessage> PublishedMessages { get; }

internal List<CapReceivedMessage> ReceivedMessages { get; }

public IStorageTransaction CreateTransaction()
{
return new InMemoryStorageTransaction(this);
}

public Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
return PublishedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id);
}

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
return await PublishedMessages.ToAsyncEnumerable()
.Where(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.ToListAsync();
}

public void StoreReceivedMessage(CapReceivedMessage message)
{
ReceivedMessages.Add(message);
}

public Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
return ReceivedMessages.ToAsyncEnumerable().FirstOrDefaultAsync(x => x.Id == id);
}

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
return await ReceivedMessages.ToAsyncEnumerable()
.Where(x => x.Retries < _capOptions.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.ToListAsync();
}

public bool ChangePublishedState(long messageId, string state)
{
var msg = PublishedMessages.First(x => x.Id == messageId);
msg.Retries++;
msg.ExpiresAt = null;
msg.StatusName = state;
return true;
}

public bool ChangeReceivedState(long messageId, string state)
{
var msg = ReceivedMessages.First(x => x.Id == messageId);
msg.Retries++;
msg.ExpiresAt = null;
msg.StatusName = state;
return true;
}
}
}

+ 58
- 0
src/DotNetCore.CAP.InMemoryStorage/IStorageTransaction.InMemory.cs View File

@@ -0,0 +1,58 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.InMemoryStorage
{
public class InMemoryStorageTransaction : IStorageTransaction
{
private readonly InMemoryStorageConnection _connection;

public InMemoryStorageTransaction(InMemoryStorageConnection connection)
{
_connection = connection;
}

public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var msg = _connection.PublishedMessages.FirstOrDefault(x => message.Id == x.Id);
if (msg == null) return;
msg.Retries = message.Retries;
msg.Content = message.Content;
msg.ExpiresAt = message.ExpiresAt;
msg.StatusName = message.StatusName;
}

public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var msg = _connection.ReceivedMessages.FirstOrDefault(x => message.Id == x.Id);
if (msg == null) return;
msg.Retries = message.Retries;
msg.Content = message.Content;
msg.ExpiresAt = message.ExpiresAt;
msg.StatusName = message.StatusName;
}

public Task CommitAsync()
{
return Task.CompletedTask;
}

public void Dispose()
{
}
}
}

+ 1
- 1
src/DotNetCore.CAP/ICallbackPublisher.cs View File

@@ -7,7 +7,7 @@ using DotNetCore.CAP.Models;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// A callback that is sent to Productor after a successful consumer execution
/// A callback that is sent to Producer after a successful consumer execution
/// </summary> /// </summary>
public interface ICallbackPublisher public interface ICallbackPublisher
{ {


Loading…
Cancel
Save