Browse Source

Feature/support mongodb (#163)

* Ignore .vscode

* Add MongoDBTest

* New DotNetCore.CAP.MongoDB

* Use MongoDB

* Initialize MongoDB

* GetPublishedMessageAsync

* GetPublishedMessagesOfNeedRetry

* Get received message

* Change published state

* Chang received state

* Store received message

* MongoDBStorageTransaction

* Implement CapPublisher

* Fix CAP.BuilderTest

* Implement ICollectProcessor

* Configure MongoDBOptions

* Test MongoDBHelper

* Remove useless code

* GetMonitoringApi

* Test MongoDbStorage

* Test MongoDBStorageConnection

* Test MongoDBStorageConnection

* Add sample for MongoDB

* Tweak CapPublisher

* Wati interval

* Tweak Mongo extension

* Publish without mongo session

* Publish rollback

* Publish without session & subscribe msg

* Tweak rabbitmq config

* .gitignore

* Implement & test HourlyFailedJobs

* Rename MongoDBUtil

* Rename MongoDBUtilTest

* Tweak pipeline

* Implement HourlyJobs

* Remove redundant code

* Implement Messages

* Implement MessagesCount

* Fix non-standard StatusName

* To local time

* Tweak two properties name

* Tweak two methods name

* Supplement xml comments

* Fix wrong names
master
keke 6 years ago
committed by Savorboard
parent
commit
1242d2d144
29 changed files with 1446 additions and 9 deletions
  1. +2
    -0
      .gitignore
  2. +21
    -0
      CAP.sln
  3. +74
    -0
      samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs
  4. +24
    -0
      samples/Sample.RabbitMQ.MongoDB/Program.cs
  5. +21
    -0
      samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj
  6. +60
    -0
      samples/Sample.RabbitMQ.MongoDB/Startup.cs
  7. +17
    -0
      samples/Sample.RabbitMQ.MongoDB/appsettings.json
  8. +31
    -0
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
  9. +19
    -0
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs
  10. +26
    -0
      src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs
  11. +178
    -0
      src/DotNetCore.CAP.MongoDB/CapPublisher.cs
  12. +17
    -0
      src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj
  13. +46
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs
  14. +176
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs
  15. +69
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
  16. +118
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
  17. +70
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs
  18. +63
    -0
      src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs
  19. +13
    -3
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  20. +22
    -4
      src/DotNetCore.CAP/ICapPublisher.cs
  21. +12
    -0
      src/DotNetCore.CAP/Infrastructure/StatusName.cs
  22. +7
    -0
      test/DotNetCore.CAP.MongoDB.Test/ConnectionUtil.cs
  23. +23
    -0
      test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj
  24. +83
    -0
      test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs
  25. +67
    -0
      test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs
  26. +38
    -0
      test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs
  27. +82
    -0
      test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs
  28. +50
    -0
      test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs
  29. +17
    -2
      test/DotNetCore.CAP.Test/CAP.BuilderTest.cs

+ 2
- 0
.gitignore View File

@@ -39,3 +39,5 @@ Properties
/src/DotNetCore.CAP/packages.config
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
/NuGet.config
.vscode/*
samples/Sample.RabbitMQ.MongoDB/appsettings.Development.json

+ 21
- 0
CAP.sln View File

@@ -60,6 +60,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB.Test", "test\DotNetCore.CAP.MongoDB.Test\DotNetCore.CAP.MongoDB.Test.csproj", "{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB", "src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj", "{77C0AC02-C44B-49D5-B969-7D5305FC20A5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -113,6 +119,18 @@ Global
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.Build.0 = Release|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -130,6 +148,9 @@ Global
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 74
- 0
samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs View File

@@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Sample.RabbitMQ.MongoDB.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ValuesController : ControllerBase
{
private readonly IMongoClient _client;
private readonly ICapPublisher _capPublisher;

public ValuesController(IMongoClient client, ICapPublisher capPublisher)
{
_client = client;
_capPublisher = capPublisher;
}

[Route("~/publish")]
public IActionResult PublishWithSession()
{
using (var session = _client.StartSession())
{
session.StartTransaction();
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });

_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);

session.CommitTransaction();
}
return Ok();
}

[Route("~/publish_rollback")]
public IActionResult PublishRollback()
{
using (var session = _client.StartSession())
{
try
{
session.StartTransaction();
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);
throw new Exception("Foo");
}
catch (System.Exception ex)
{
session.AbortTransaction();
return StatusCode(500, ex.Message);
}
}
}

[Route("~/publish_without_session")]
public IActionResult PublishWithoutSession()
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now);
return Ok();
}

[NonAction]
[CapSubscribe("sample.rabbitmq.mongodb")]
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mongodb] message received: " + DateTime.Now + ",sent time: " + time);
}
}
}

+ 24
- 0
samples/Sample.RabbitMQ.MongoDB/Program.cs View File

@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Sample.RabbitMQ.MongoDB
{
public class Program
{
public static void Main(string[] args)
{
CreateWebHostBuilder(args).Build().Run();
}

public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
}
}

+ 21
- 0
samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj View File

@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Folder Include="wwwroot\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" />
</ItemGroup>

</Project>

+ 60
- 0
samples/Sample.RabbitMQ.MongoDB/Startup.cs View File

@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;

namespace Sample.RabbitMQ.MongoDB
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}

public IConfiguration Configuration { get; }

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMongoClient>(new MongoClient(Configuration.GetConnectionString("MongoDB")));
services.AddCap(x =>
{
x.UseMongoDB();

var mq = new RabbitMQOptions();
Configuration.GetSection("RabbitMQ").Bind(mq);
x.UseRabbitMQ(cfg =>
{
cfg.HostName = mq.HostName;
cfg.Port = mq.Port;
cfg.UserName = mq.UserName;
cfg.Password = mq.Password;
});

x.UseDashboard();
});
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseMvc();

app.UseCap();
}
}
}

+ 17
- 0
samples/Sample.RabbitMQ.MongoDB/appsettings.json View File

@@ -0,0 +1,17 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"AllowedHosts": "*",
"ConnectionStrings": {
"MongoDB": "mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0"
},
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "",
"Password": ""
}
}

+ 31
- 0
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs View File

@@ -0,0 +1,31 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCapOptionsExtension : ICapOptionsExtension
{
private Action<MongoDBOptions> _configure;

public MongoDBCapOptionsExtension(Action<MongoDBOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, MongoDBStorage>();
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();

var options = new MongoDBOptions();
_configure?.Invoke(options);
services.AddSingleton(options);
}
}
}

+ 19
- 0
src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs View File

@@ -0,0 +1,19 @@
using System;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBOptions
{
public const string DefaultDatabase = "Cap";

/// <summary>
/// Gets or sets the database to use when creating database objects.
/// Default is <see cref="DefaultDatabase" />.
/// </summary>
public string Database { get; set; } = DefaultDatabase;

public string ReceivedCollection { get; } = "Received";

public string PublishedCollection { get; } = "Published";
}
}

+ 26
- 0
src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs View File

@@ -0,0 +1,26 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.MongoDB;

namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseMongoDB(this CapOptions options)
{
return options.UseMongoDB(x => { });
}

public static CapOptions UseMongoDB(this CapOptions options, Action<MongoDBOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new MongoDBCapOptionsExtension(configure));

return options;
}
}
}

+ 178
- 0
src/DotNetCore.CAP.MongoDB/CapPublisher.cs View File

@@ -0,0 +1,178 @@
using System;
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;
private bool _isInTransaction = true;

public CapPublisher(ILogger<CapPublisherBase> logger, IDispatcher dispatcher,
IMongoClient client, MongoDBOptions options, IServiceProvider provider)
: base(logger, dispatcher)
{
_client = client;
_options = options;
_database = client.GetDatabase(_options.Database);
ServiceProvider = provider;
}

public async Task PublishCallbackAsync(CapPublishedMessage message)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection);
collection.InsertOne(message);
Enqueue(message);
}

protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
throw new System.NotImplementedException("Not work for MongoDB");
}

protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
throw new System.NotImplementedException("Not work for MongoDB");
}

protected override void PrepareConnectionForEF()
{
throw new System.NotImplementedException("Not work for MongoDB");
}

public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
var session = mongoSession as IClientSessionHandle;
if (session == null)
{
_isInTransaction = false;
}

PublishWithSession<T>(name, contentObj, session, callbackName);
}

public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
var session = mongoSession as IClientSessionHandle;
if (session == null)
{
_isInTransaction = false;
}

await PublishWithSessionAsync<T>(name, contentObj, session, callbackName);
}

private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
{
Guid operationId = default(Guid);

var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(session, message);

if (!_isInTransaction && id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id;
Enqueue(message);
}
}
catch (System.Exception e)
{
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
throw;
}
}

private int Execute(IClientSessionHandle session, CapPublishedMessage message)
{
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session);

var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
{
collection.InsertOne(session, message);
}
else
{
collection.InsertOne(message);
}
return message.Id;
}


private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
{
Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = await ExecuteAsync(session, message);

if (!_isInTransaction && id > 0)
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);

message.Id = id;

Enqueue(message);
}
}
catch (System.Exception e)
{
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}

private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message)
{
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
{
await collection.InsertOneAsync(session, message);
}
else
{
await collection.InsertOneAsync(message);
}
return message.Id;
}
}
}

+ 17
- 0
src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj View File

@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Bson" Version="2.7.0" />
<PackageReference Include="MongoDB.Driver" Version="2.7.0" />
<PackageReference Include="MongoDB.Driver.Core" Version="2.7.0" />
</ItemGroup>

</Project>

+ 46
- 0
src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs View File

@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCollectProcessor : ICollectProcessor
{
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly ILogger _logger;
private readonly IMongoDatabase _database;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);

public MongoDBCollectProcessor(IMongoClient client, MongoDBOptions options,
ILogger<MongoDBCollectProcessor> logger)
{
_client = client;
_options = options;
_logger = logger;
_database = client.GetDatabase(_options.Database);
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug($"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}].");

var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

await publishedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<CapPublishedMessage>(Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
await receivedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel<CapReceivedMessage>(Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});

await context.WaitAsync(_waitingInterval);
}
}
}

+ 176
- 0
src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs View File

@@ -0,0 +1,176 @@
using System;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBMonitoringApi : IMonitoringApi
{
private IMongoClient _client;
private MongoDBOptions _options;
private IMongoDatabase _database;

public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_options = options ?? throw new ArgumentNullException(nameof(options));

_database = _client.GetDatabase(_options.Database);
}

public StatisticsDto GetStatistics()
{
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

var statistics = new StatisticsDto();

{
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count))
statistics.PublishedSucceeded = count;
}
{
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count))
statistics.PublishedFailed = count;
}
{
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count))
statistics.ReceivedSucceeded = count;
}
{
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count))
statistics.ReceivedFailed = count;
}

return statistics;
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Failed);
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
return GetHourlyTimelineStats(type, StatusName.Succeeded);
}

public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName);

var name = queryDto.MessageType == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var collection = _database.GetCollection<MessageDto>(name);

var builder = Builders<MessageDto>.Filter;
var filter = builder.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName);
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
filter = filter & builder.Eq(x => x.Name, queryDto.Name);
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
filter = filter & builder.Eq(x => x.Group, queryDto.Group);
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
}

var result =
collection.Find(filter)
.SortByDescending(x => x.Added)
.Skip(queryDto.PageSize * queryDto.CurrentPage)
.Limit(queryDto.PageSize)
.ToList();

return result;
}

public int PublishedFailedCount()
{
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed);
}

public int PublishedSucceededCount()
{
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded);
}

public int ReceivedFailedCount()
{
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed);
}

public int ReceivedSucceededCount()
{
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded);
}

private int GetNumberOfMessage(string collectionName, string statusName)
{
var collection = _database.GetCollection<BsonDocument>(collectionName);
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } });
return int.Parse(count.ToString());
}

private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
{
var collectionName = type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var endDate = DateTime.UtcNow;

var groupby = new BsonDocument {
{ "$group", new BsonDocument{
{ "_id", new BsonDocument {
{ "Key", new BsonDocument {
{ "$dateToString", new BsonDocument {
{ "format", "%Y-%m-%d %H:00:00"},
{ "date", "$Added"}
}}
}}
}
},
{ "Count", new BsonDocument{
{ "$sum", 1}
}}
}}
};

var match = new BsonDocument { { "$match", new BsonDocument {
{ "Added", new BsonDocument { { "$gt", endDate.AddHours(-24) } } },
{ "StatusName", new BsonDocument { { "$eq", statusName} }
} } } };
var pipeline = new BsonDocument[] { match, groupby };

var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline: pipeline).ToList();

var dic = new Dictionary<DateTime, int>();
for (var i = 0; i < 24; i++)
{
dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0);
endDate = endDate.AddHours(-1);
}
result.ForEach(d =>
{
var key = d["_id"].AsBsonDocument["Key"].AsString;
if (DateTime.TryParse(key, out var dateTime))
{
dic[dateTime.ToLocalTime()] = d["Count"].AsInt32;
}
});

return dic;
}
}
}

+ 69
- 0
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs View File

@@ -0,0 +1,69 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorage : IStorage
{
private readonly CapOptions _capOptions;
private readonly MongoDBOptions _options;
private readonly IMongoClient _client;
private readonly ILogger<MongoDBStorage> _logger;

public MongoDBStorage(CapOptions capOptions,
MongoDBOptions options,
IMongoClient client,
ILogger<MongoDBStorage> logger)
{
_capOptions = capOptions;
_options = options;
_client = client;
_logger = logger;
}

public IStorageConnection GetConnection()
{
return new MongoDBStorageConnection(_capOptions, _options, _client);
}

public IMonitoringApi GetMonitoringApi()
{
return new MongoDBMonitoringApi(_client, _options);
}

public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

var database = _client.GetDatabase(_options.Database);
var names = (await database.ListCollectionNamesAsync())?.ToList();

if (!names.Any(n => n == _options.ReceivedCollection))
{
await database.CreateCollectionAsync(_options.ReceivedCollection);
}
if (!names.Any(n => n == _options.PublishedCollection))
{
await database.CreateCollectionAsync(_options.PublishedCollection);
}
if (!names.Any(n => n == "Counter"))
{
await database.CreateCollectionAsync("Counter");
var collection = database.GetCollection<BsonDocument>("Counter");
await collection.InsertManyAsync(new BsonDocument[]
{
new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
});
}
}
}
}

+ 118
- 0
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs View File

@@ -0,0 +1,118 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorageConnection : IStorageConnection
{
private CapOptions _capOptions;
private MongoDBOptions _options;
private readonly IMongoClient _client;
private readonly IMongoDatabase _database;

public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client)
{
_capOptions = capOptions;
_options = options;
_client = client;
_database = _client.GetDatabase(_options.Database);
}

public bool ChangePublishedState(int messageId, string state)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);

var updateDef = Builders<CapPublishedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);

var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);

return result.ModifiedCount > 0;
}

public bool ChangeReceivedState(int messageId, string state)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

var updateDef = Builders<CapReceivedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);

var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);

return result.ModifiedCount > 0;
}

public IStorageTransaction CreateTransaction()
{
return new MongoDBStorageTransaction(_client, _options);
}

public void Dispose()
{
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
}

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await
collection.Find(x =>
x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
}

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

return await
collection.Find(x =>
x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)
).Limit(200).ToListAsync();
}

public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection);

collection.InsertOne(message);

return message.Id;
}
}
}

+ 70
- 0
src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs View File

@@ -0,0 +1,70 @@
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using MongoDB.Driver;
using System;

namespace DotNetCore.CAP.MongoDB
{
internal class MongoDBStorageTransaction : IStorageTransaction
{
private IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;
private readonly IClientSessionHandle _session;

public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{
_client = client;
_options = options;
_database = client.GetDatabase(options.Database);
_session = _client.StartSession();
_session.StartTransaction();
}

public async Task CommitAsync()
{
await _session.CommitTransactionAsync();
}

public void Dispose()
{
_session.Dispose();
}

public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);

var updateDef = Builders<CapPublishedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);

collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}

public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

var updateDef = Builders<CapReceivedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);

collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}
}
}

+ 63
- 0
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs View File

@@ -0,0 +1,63 @@
using System;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBUtil
{
FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>()
{
ReturnDocument = ReturnDocument.After
};
public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, IClientSessionHandle session = null)
{
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm
var collection = database.GetCollection<BsonDocument>("Counter");

var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);
var filter = new BsonDocument { { "_id", collectionName } };

BsonDocument result;
if (session == null)
{
result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options);
}
else
{
result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options);
}

if (result.TryGetValue("sequence_value", out var value))
{
return value.ToInt32();
}
throw new Exception("Unable to get next sequence value.");
}

public int GetNextSequenceValue(IMongoDatabase database, string collectionName, IClientSessionHandle session = null)
{
var collection = database.GetCollection<BsonDocument>("Counter");

var filter = new BsonDocument { { "_id", collectionName } };
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);

BsonDocument result;
if (session == null)
{
result = collection.FindOneAndUpdate(filter, updateDef, _options);
}
else
{
result = collection.FindOneAndUpdate(session, filter, updateDef, _options);
}

if (result.TryGetValue("sequence_value", out var value))
{
return value.ToInt32();
}
throw new Exception("Unable to get next sequence value.");
}
}
}

+ 13
- 3
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -15,11 +15,11 @@ namespace DotNetCore.CAP.Abstractions
public abstract class CapPublisherBase : ICapPublisher, IDisposable
{
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
protected readonly ILogger _logger;

// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);

protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
@@ -67,6 +67,16 @@ namespace DotNetCore.CAP.Abstractions
return PublishWithTransAsync(name, contentObj, callbackName);
}

public virtual void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}

public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}

protected void Enqueue(CapPublishedMessage message)
{
_dispatcher.EnqueueToPublish(message);
@@ -205,7 +215,7 @@ namespace DotNetCore.CAP.Abstractions
try
{
Console.WriteLine("================22222222222222=====================");
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = Execute(DbConnection, DbTransaction, message);
Console.WriteLine("================777777777777777777777=====================");


+ 22
- 4
src/DotNetCore.CAP/ICapPublisher.cs View File

@@ -12,7 +12,7 @@ namespace DotNetCore.CAP
public interface ICapPublisher
{
/// <summary>
/// (EntityFramework) Asynchronous publish a object message.
/// (EntityFramework) Asynchronous publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);

/// <summary>
/// (EntityFramework) Publish a object message.
/// (EntityFramework) Publish an object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbTransaction.
@@ -38,7 +38,7 @@ namespace DotNetCore.CAP
void Publish<T>(string name, T contentObj, string callbackName = null);

/// <summary>
/// (ado.net) Asynchronous publish a object message.
/// (ado.net) Asynchronous publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
@@ -47,12 +47,30 @@ namespace DotNetCore.CAP
Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null);

/// <summary>
/// (ado.net) Publish a object message.
/// (ado.net) Publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null);

/// <summary>
/// Publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);

/// <summary>
/// Asynchronous publish an object message with mongo.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param>
Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);
}
}

+ 12
- 0
src/DotNetCore.CAP/Infrastructure/StatusName.cs View File

@@ -11,5 +11,17 @@ namespace DotNetCore.CAP.Infrastructure
public const string Scheduled = nameof(Scheduled);
public const string Succeeded = nameof(Succeeded);
public const string Failed = nameof(Failed);

public static string Standardized(string input)
{
foreach (var item in typeof(StatusName).GetFields())
{
if (item.Name.ToLower() == input.ToLower())
{
return item.Name;
}
}
return string.Empty;
}
}
}

+ 7
- 0
test/DotNetCore.CAP.MongoDB.Test/ConnectionUtil.cs View File

@@ -0,0 +1,7 @@
namespace DotNetCore.CAP.MongoDB.Test
{
public class ConnectionUtil
{
public static string ConnectionString = "mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=my-mongo-set";
}
}

+ 23
- 0
test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj View File

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.4.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.0" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Xunit.Priority" Version="1.0.10" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" />
</ItemGroup>

</Project>

+ 83
- 0
test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs View File

@@ -0,0 +1,83 @@
using MongoDB.Driver;
using DotNetCore.CAP.MongoDB;
using Xunit;
using System;
using DotNetCore.CAP.Models;
using FluentAssertions;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using System.Linq;

namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBMonitoringApiTest
{
private MongoClient _client;
private MongoDBOptions _options;
private MongoDBMonitoringApi _api;

public MongoDBMonitoringApiTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
_options = new MongoDBOptions();
_api = new MongoDBMonitoringApi(_client, _options);

Init();
}

private void Init()
{
var helper = new MongoDBUtil();
var database = _client.GetDatabase(_options.Database);
var collection = database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
collection.InsertMany(new CapPublishedMessage[]
{
new CapPublishedMessage
{
Id = helper.GetNextSequenceValue(database,_options.PublishedCollection),
Added = DateTime.Now.AddHours(-1),
StatusName = "Failed",
Content = "abc"
},
new CapPublishedMessage
{
Id = helper.GetNextSequenceValue(database,_options.PublishedCollection),
Added = DateTime.Now,
StatusName = "Failed",
Content = "bbc"
}
});
}

[Fact]
public void HourlyFailedJobs_Test()
{
var result = _api.HourlyFailedJobs(MessageType.Publish);
result.Should().HaveCount(24);
}

[Fact]
public void Messages_Test()
{
var messages =
_api.Messages(new MessageQueryDto
{
MessageType = MessageType.Publish,
StatusName = StatusName.Failed,
Content = "b",
CurrentPage = 1,
PageSize = 1
});

messages.Should().HaveCount(1);
messages.First().Content.Should().Contain("b");
}

[Fact]
public void PublishedFailedCount_Test()
{
var count = _api.PublishedFailedCount();
count.Should().BeGreaterThan(1);
}
}
}

+ 67
- 0
test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs View File

@@ -0,0 +1,67 @@
using System.Threading;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using Xunit;
using Xunit.Priority;

namespace DotNetCore.CAP.MongoDB.Test
{
[TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)]
public class MongoDBStorageConnectionTest
{
private MongoClient _client;
private MongoDBOptions _options;
private MongoDBStorage _storage;
private IStorageConnection _connection;

public MongoDBStorageConnectionTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
_options = new MongoDBOptions();
_storage = new MongoDBStorage(new CapOptions(), _options, _client, NullLogger<MongoDBStorage>.Instance);
_connection = _storage.GetConnection();
}

[Fact, Priority(1)]
public async void StoreReceivedMessageAsync_TestAsync()
{
await _storage.InitializeAsync(default(CancellationToken));

var id = await
_connection.StoreReceivedMessageAsync(new CapReceivedMessage(new MessageContext
{
Group = "test",
Name = "test",
Content = "test-content"
}));
id.Should().BeGreaterThan(0);
}

[Fact, Priority(2)]
public void ChangeReceivedState_Test()
{
var collection = _client.GetDatabase(_options.Database).GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

var msg = collection.Find(x => true).FirstOrDefault();
_connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue();
collection.Find(x => x.Id == msg.Id).FirstOrDefault()?.StatusName.Should().Be(StatusName.Scheduled);
}

[Fact, Priority(3)]
public async void GetReceivedMessagesOfNeedRetry_TestAsync()
{
var msgs = await _connection.GetReceivedMessagesOfNeedRetry();
msgs.Should().HaveCountGreaterThan(0);
}

[Fact, Priority(4)]
public void GetReceivedMessageAsync_Test()
{
var msg = _connection.GetReceivedMessageAsync(1);
msg.Should().NotBeNull();
}
}
}

+ 38
- 0
test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs View File

@@ -0,0 +1,38 @@
using System.Threading;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Bson;
using MongoDB.Driver;
using Xunit;

namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBStorageTest
{
private MongoClient _client;

public MongoDBStorageTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
}

[Fact]
public async void InitializeAsync_Test()
{
var options = new MongoDBOptions();
var storage = new MongoDBStorage(new CapOptions(), options, _client, NullLogger<MongoDBStorage>.Instance);
await storage.InitializeAsync(default(CancellationToken));
var names = _client.ListDatabaseNames()?.ToList();
names.Should().Contain(options.Database);

var collections = _client.GetDatabase(options.Database).ListCollectionNames()?.ToList();
collections.Should().Contain(options.PublishedCollection);
collections.Should().Contain(options.ReceivedCollection);
collections.Should().Contain("Counter");

var collection = _client.GetDatabase(options.Database).GetCollection<BsonDocument>("Counter");
collection.CountDocuments(new BsonDocument { { "_id", options.PublishedCollection } }).Should().Be(1);
collection.CountDocuments(new BsonDocument { { "_id", options.ReceivedCollection } }).Should().Be(1);
}
}
}

+ 82
- 0
test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs View File

@@ -0,0 +1,82 @@
using System;
using FluentAssertions;
using MongoDB.Bson;
using MongoDB.Driver;
using Xunit;

namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBTest
{
private MongoClient _client;

public MongoDBTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
}

[Fact]
public void MongoDB_Connection_Test()
{
var names = _client.ListDatabaseNames();
names.ToList().Should().NotBeNullOrEmpty();
}

[Fact]
public void Transaction_Test()
{
var document = new BsonDocument
{
{ "name", "MongoDB" },
{ "type", "Database" },
{ "count", 1 },
{ "info", new BsonDocument
{
{ "x", 203 },
{ "y", 102 }
}}
};
var db = _client.GetDatabase("test");
var collection1 = db.GetCollection<BsonDocument>("test1");
var collection2 = db.GetCollection<BsonDocument>("test2");
using (var sesstion = _client.StartSession())
{
sesstion.StartTransaction();
collection1.InsertOne(document);
collection2.InsertOne(document);
sesstion.CommitTransaction();
}
var filter = new BsonDocument("name", "MongoDB");
collection1.CountDocuments(filter).Should().BeGreaterThan(0);
collection2.CountDocuments(filter).Should().BeGreaterThan(0);
}

[Fact]
public void Transaction_Rollback_Test()
{
var document = new BsonDocument
{
{"name", "MongoDB"},
{"date", DateTimeOffset.Now.ToString()}
};
var db = _client.GetDatabase("test");

var collection = db.GetCollection<BsonDocument>("test3");
var collection4 = db.GetCollection<BsonDocument>("test4");

using (var session = _client.StartSession())
{
session.IsInTransaction.Should().BeFalse();
session.StartTransaction();
session.IsInTransaction.Should().BeTrue();
collection.InsertOne(session, document);
collection4.InsertOne(session, new BsonDocument { { "name", "MongoDB" } });

session.AbortTransaction();
}
var filter = new BsonDocument("name", "MongoDB");
collection.CountDocuments(filter).Should().Be(0);
collection4.CountDocuments(filter).Should().Be(0);
}
}
}

+ 50
- 0
test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs View File

@@ -0,0 +1,50 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using FluentAssertions;
using MongoDB.Bson;
using MongoDB.Driver;
using Xunit;

namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBUtilTest
{
private readonly MongoClient _client;
private readonly IMongoDatabase _database;
string _recieved = "ReceivedTest";

public MongoDBUtilTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
_database = _client.GetDatabase("CAP_Test");

//Initialize MongoDB
if (!_database.ListCollectionNames().ToList().Any(x => x == "Counter"))
{
var collection = _database.GetCollection<BsonDocument>("Counter");
collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } });
}
}

[Fact]
public async void GetNextSequenceValueAsync_Test()
{
var id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _recieved);
id.Should().BeGreaterThan(0);
}

[Fact]
public void GetNextSequenceValue_Concurrency_Test()
{
var dic = new ConcurrentDictionary<int, int>();
Parallel.For(0, 30, (x) =>
{
var id = new MongoDBUtil().GetNextSequenceValue(_database, _recieved);
id.Should().BeGreaterThan(0);
dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same.
});
}
}
}

+ 17
- 2
test/DotNetCore.CAP.Test/CAP.BuilderTest.cs View File

@@ -15,13 +15,13 @@ namespace DotNetCore.CAP.Test
public void CanCreateInstanceAndGetService()
{
var services = new ServiceCollection();
services.AddSingleton<ICapPublisher, MyProducerService>();
var builder = new CapBuilder(services);
Assert.NotNull(builder);

var count = builder.Services.Count;
Assert.Equal(1, count);
Assert.Equal(1, count);

var provider = services.BuildServiceProvider();
var capPublisher = provider.GetService<ICapPublisher>();
@@ -129,6 +129,11 @@ namespace DotNetCore.CAP.Test
throw new NotImplementedException();
}

public void Publish<T>(string name, T contentObj, object mongoSession, string callbackName = null)
{
throw new NotImplementedException();
}

public Task PublishAsync(string topic, string content)
{
throw new NotImplementedException();
@@ -163,6 +168,16 @@ namespace DotNetCore.CAP.Test
{
throw new NotImplementedException();
}

public void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
throw new NotImplementedException();
}

public Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
{
throw new NotImplementedException();
}
}
}
}

Loading…
Cancel
Save