* Ignore .vscode * Add MongoDBTest * New DotNetCore.CAP.MongoDB * Use MongoDB * Initialize MongoDB * GetPublishedMessageAsync * GetPublishedMessagesOfNeedRetry * Get received message * Change published state * Chang received state * Store received message * MongoDBStorageTransaction * Implement CapPublisher * Fix CAP.BuilderTest * Implement ICollectProcessor * Configure MongoDBOptions * Test MongoDBHelper * Remove useless code * GetMonitoringApi * Test MongoDbStorage * Test MongoDBStorageConnection * Test MongoDBStorageConnection * Add sample for MongoDB * Tweak CapPublisher * Wati interval * Tweak Mongo extension * Publish without mongo session * Publish rollback * Publish without session & subscribe msg * Tweak rabbitmq config * .gitignore * Implement & test HourlyFailedJobs * Rename MongoDBUtil * Rename MongoDBUtilTest * Tweak pipeline * Implement HourlyJobs * Remove redundant code * Implement Messages * Implement MessagesCount * Fix non-standard StatusName * To local time * Tweak two properties name * Tweak two methods name * Supplement xml comments * Fix wrong namesmaster
@@ -39,3 +39,5 @@ Properties | |||||
/src/DotNetCore.CAP/packages.config | /src/DotNetCore.CAP/packages.config | ||||
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj | /src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj | ||||
/NuGet.config | /NuGet.config | ||||
.vscode/* | |||||
samples/Sample.RabbitMQ.MongoDB/appsettings.Development.json |
@@ -60,6 +60,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.T | |||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB.Test", "test\DotNetCore.CAP.MongoDB.Test\DotNetCore.CAP.MongoDB.Test.csproj", "{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB", "src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj", "{77C0AC02-C44B-49D5-B969-7D5305FC20A5}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}" | |||||
EndProject | |||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
Debug|Any CPU = Debug|Any CPU | Debug|Any CPU = Debug|Any CPU | ||||
@@ -113,6 +119,18 @@ Global | |||||
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU | {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU | {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU | {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
@@ -130,6 +148,9 @@ Global | |||||
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | {82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | ||||
{7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} | {7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} | ||||
{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF} | {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF} | ||||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} | |||||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | |||||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} | SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} | ||||
@@ -0,0 +1,74 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP; | |||||
using Microsoft.AspNetCore.Mvc; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
namespace Sample.RabbitMQ.MongoDB.Controllers | |||||
{ | |||||
[Route("api/[controller]")] | |||||
[ApiController] | |||||
public class ValuesController : ControllerBase | |||||
{ | |||||
private readonly IMongoClient _client; | |||||
private readonly ICapPublisher _capPublisher; | |||||
public ValuesController(IMongoClient client, ICapPublisher capPublisher) | |||||
{ | |||||
_client = client; | |||||
_capPublisher = capPublisher; | |||||
} | |||||
[Route("~/publish")] | |||||
public IActionResult PublishWithSession() | |||||
{ | |||||
using (var session = _client.StartSession()) | |||||
{ | |||||
session.StartTransaction(); | |||||
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test"); | |||||
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); | |||||
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session); | |||||
session.CommitTransaction(); | |||||
} | |||||
return Ok(); | |||||
} | |||||
[Route("~/publish_rollback")] | |||||
public IActionResult PublishRollback() | |||||
{ | |||||
using (var session = _client.StartSession()) | |||||
{ | |||||
try | |||||
{ | |||||
session.StartTransaction(); | |||||
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session); | |||||
throw new Exception("Foo"); | |||||
} | |||||
catch (System.Exception ex) | |||||
{ | |||||
session.AbortTransaction(); | |||||
return StatusCode(500, ex.Message); | |||||
} | |||||
} | |||||
} | |||||
[Route("~/publish_without_session")] | |||||
public IActionResult PublishWithoutSession() | |||||
{ | |||||
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now); | |||||
return Ok(); | |||||
} | |||||
[NonAction] | |||||
[CapSubscribe("sample.rabbitmq.mongodb")] | |||||
public void ReceiveMessage(DateTime time) | |||||
{ | |||||
Console.WriteLine("[sample.rabbitmq.mongodb] message received: " + DateTime.Now + ",sent time: " + time); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,24 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.IO; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.AspNetCore; | |||||
using Microsoft.AspNetCore.Hosting; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace Sample.RabbitMQ.MongoDB | |||||
{ | |||||
public class Program | |||||
{ | |||||
public static void Main(string[] args) | |||||
{ | |||||
CreateWebHostBuilder(args).Build().Run(); | |||||
} | |||||
public static IWebHostBuilder CreateWebHostBuilder(string[] args) => | |||||
WebHost.CreateDefaultBuilder(args) | |||||
.UseStartup<Startup>(); | |||||
} | |||||
} |
@@ -0,0 +1,21 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk.Web"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<Folder Include="wwwroot\" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.AspNetCore.App" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" /> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,60 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP; | |||||
using Microsoft.AspNetCore.Builder; | |||||
using Microsoft.AspNetCore.Hosting; | |||||
using Microsoft.AspNetCore.HttpsPolicy; | |||||
using Microsoft.AspNetCore.Mvc; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using MongoDB.Driver; | |||||
namespace Sample.RabbitMQ.MongoDB | |||||
{ | |||||
public class Startup | |||||
{ | |||||
public Startup(IConfiguration configuration) | |||||
{ | |||||
Configuration = configuration; | |||||
} | |||||
public IConfiguration Configuration { get; } | |||||
public void ConfigureServices(IServiceCollection services) | |||||
{ | |||||
services.AddSingleton<IMongoClient>(new MongoClient(Configuration.GetConnectionString("MongoDB"))); | |||||
services.AddCap(x => | |||||
{ | |||||
x.UseMongoDB(); | |||||
var mq = new RabbitMQOptions(); | |||||
Configuration.GetSection("RabbitMQ").Bind(mq); | |||||
x.UseRabbitMQ(cfg => | |||||
{ | |||||
cfg.HostName = mq.HostName; | |||||
cfg.Port = mq.Port; | |||||
cfg.UserName = mq.UserName; | |||||
cfg.Password = mq.Password; | |||||
}); | |||||
x.UseDashboard(); | |||||
}); | |||||
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); | |||||
} | |||||
public void Configure(IApplicationBuilder app, IHostingEnvironment env) | |||||
{ | |||||
if (env.IsDevelopment()) | |||||
{ | |||||
app.UseDeveloperExceptionPage(); | |||||
} | |||||
app.UseMvc(); | |||||
app.UseCap(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,17 @@ | |||||
{ | |||||
"Logging": { | |||||
"LogLevel": { | |||||
"Default": "Warning" | |||||
} | |||||
}, | |||||
"AllowedHosts": "*", | |||||
"ConnectionStrings": { | |||||
"MongoDB": "mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0" | |||||
}, | |||||
"RabbitMQ": { | |||||
"HostName": "localhost", | |||||
"Port": 5672, | |||||
"UserName": "", | |||||
"Password": "" | |||||
} | |||||
} |
@@ -0,0 +1,31 @@ | |||||
using System; | |||||
using DotNetCore.CAP; | |||||
using DotNetCore.CAP.Processor; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBCapOptionsExtension : ICapOptionsExtension | |||||
{ | |||||
private Action<MongoDBOptions> _configure; | |||||
public MongoDBCapOptionsExtension(Action<MongoDBOptions> configure) | |||||
{ | |||||
_configure = configure; | |||||
} | |||||
public void AddServices(IServiceCollection services) | |||||
{ | |||||
services.AddSingleton<CapDatabaseStorageMarkerService>(); | |||||
services.AddSingleton<IStorage, MongoDBStorage>(); | |||||
services.AddSingleton<IStorageConnection, MongoDBStorageConnection>(); | |||||
services.AddScoped<ICapPublisher, CapPublisher>(); | |||||
services.AddScoped<ICallbackPublisher, CapPublisher>(); | |||||
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>(); | |||||
var options = new MongoDBOptions(); | |||||
_configure?.Invoke(options); | |||||
services.AddSingleton(options); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,19 @@ | |||||
using System; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBOptions | |||||
{ | |||||
public const string DefaultDatabase = "Cap"; | |||||
/// <summary> | |||||
/// Gets or sets the database to use when creating database objects. | |||||
/// Default is <see cref="DefaultDatabase" />. | |||||
/// </summary> | |||||
public string Database { get; set; } = DefaultDatabase; | |||||
public string ReceivedCollection { get; } = "Received"; | |||||
public string PublishedCollection { get; } = "Published"; | |||||
} | |||||
} |
@@ -0,0 +1,26 @@ | |||||
using System; | |||||
using DotNetCore.CAP; | |||||
using DotNetCore.CAP.MongoDB; | |||||
namespace Microsoft.Extensions.DependencyInjection | |||||
{ | |||||
public static class CapOptionsExtensions | |||||
{ | |||||
public static CapOptions UseMongoDB(this CapOptions options) | |||||
{ | |||||
return options.UseMongoDB(x => { }); | |||||
} | |||||
public static CapOptions UseMongoDB(this CapOptions options, Action<MongoDBOptions> configure) | |||||
{ | |||||
if (configure == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(configure)); | |||||
} | |||||
options.RegisterExtension(new MongoDBCapOptionsExtension(configure)); | |||||
return options; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,178 @@ | |||||
using System; | |||||
using System.Data; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Abstractions; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using Microsoft.Extensions.Logging; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class CapPublisher : CapPublisherBase, ICallbackPublisher | |||||
{ | |||||
private readonly IMongoClient _client; | |||||
private readonly MongoDBOptions _options; | |||||
private readonly IMongoDatabase _database; | |||||
private bool _isInTransaction = true; | |||||
public CapPublisher(ILogger<CapPublisherBase> logger, IDispatcher dispatcher, | |||||
IMongoClient client, MongoDBOptions options, IServiceProvider provider) | |||||
: base(logger, dispatcher) | |||||
{ | |||||
_client = client; | |||||
_options = options; | |||||
_database = client.GetDatabase(_options.Database); | |||||
ServiceProvider = provider; | |||||
} | |||||
public async Task PublishCallbackAsync(CapPublishedMessage message) | |||||
{ | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection); | |||||
collection.InsertOne(message); | |||||
Enqueue(message); | |||||
} | |||||
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | |||||
{ | |||||
throw new System.NotImplementedException("Not work for MongoDB"); | |||||
} | |||||
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | |||||
{ | |||||
throw new System.NotImplementedException("Not work for MongoDB"); | |||||
} | |||||
protected override void PrepareConnectionForEF() | |||||
{ | |||||
throw new System.NotImplementedException("Not work for MongoDB"); | |||||
} | |||||
public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
var session = mongoSession as IClientSessionHandle; | |||||
if (session == null) | |||||
{ | |||||
_isInTransaction = false; | |||||
} | |||||
PublishWithSession<T>(name, contentObj, session, callbackName); | |||||
} | |||||
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
var session = mongoSession as IClientSessionHandle; | |||||
if (session == null) | |||||
{ | |||||
_isInTransaction = false; | |||||
} | |||||
await PublishWithSessionAsync<T>(name, contentObj, session, callbackName); | |||||
} | |||||
private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName) | |||||
{ | |||||
Guid operationId = default(Guid); | |||||
var content = Serialize(contentObj, callbackName); | |||||
var message = new CapPublishedMessage | |||||
{ | |||||
Name = name, | |||||
Content = content, | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
try | |||||
{ | |||||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||||
var id = Execute(session, message); | |||||
if (!_isInTransaction && id > 0) | |||||
{ | |||||
_logger.LogInformation($"message [{message}] has been persisted in the database."); | |||||
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); | |||||
message.Id = id; | |||||
Enqueue(message); | |||||
} | |||||
} | |||||
catch (System.Exception e) | |||||
{ | |||||
_logger.LogError(e, "An exception was occurred when publish message. message:" + name); | |||||
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); | |||||
throw; | |||||
} | |||||
} | |||||
private int Execute(IClientSessionHandle session, CapPublishedMessage message) | |||||
{ | |||||
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
if (_isInTransaction) | |||||
{ | |||||
collection.InsertOne(session, message); | |||||
} | |||||
else | |||||
{ | |||||
collection.InsertOne(message); | |||||
} | |||||
return message.Id; | |||||
} | |||||
private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName) | |||||
{ | |||||
Guid operationId = default(Guid); | |||||
var content = Serialize(contentObj, callbackName); | |||||
var message = new CapPublishedMessage | |||||
{ | |||||
Name = name, | |||||
Content = content, | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
try | |||||
{ | |||||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||||
var id = await ExecuteAsync(session, message); | |||||
if (!_isInTransaction && id > 0) | |||||
{ | |||||
_logger.LogInformation($"message [{message}] has been persisted in the database."); | |||||
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); | |||||
message.Id = id; | |||||
Enqueue(message); | |||||
} | |||||
} | |||||
catch (System.Exception e) | |||||
{ | |||||
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); | |||||
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); | |||||
Console.WriteLine(e); | |||||
throw; | |||||
} | |||||
} | |||||
private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) | |||||
{ | |||||
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
if (_isInTransaction) | |||||
{ | |||||
await collection.InsertOneAsync(session, message); | |||||
} | |||||
else | |||||
{ | |||||
await collection.InsertOneAsync(message); | |||||
} | |||||
return message.Id; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,17 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||||
</ItemGroup> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.0</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="MongoDB.Bson" Version="2.7.0" /> | |||||
<PackageReference Include="MongoDB.Driver" Version="2.7.0" /> | |||||
<PackageReference Include="MongoDB.Driver.Core" Version="2.7.0" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,46 @@ | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Models; | |||||
using DotNetCore.CAP.Processor; | |||||
using Microsoft.Extensions.Logging; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBCollectProcessor : ICollectProcessor | |||||
{ | |||||
private readonly IMongoClient _client; | |||||
private readonly MongoDBOptions _options; | |||||
private readonly ILogger _logger; | |||||
private readonly IMongoDatabase _database; | |||||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | |||||
public MongoDBCollectProcessor(IMongoClient client, MongoDBOptions options, | |||||
ILogger<MongoDBCollectProcessor> logger) | |||||
{ | |||||
_client = client; | |||||
_options = options; | |||||
_logger = logger; | |||||
_database = client.GetDatabase(_options.Database); | |||||
} | |||||
public async Task ProcessAsync(ProcessingContext context) | |||||
{ | |||||
_logger.LogDebug($"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}]."); | |||||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
await publishedCollection.BulkWriteAsync(new[] | |||||
{ | |||||
new DeleteManyModel<CapPublishedMessage>(Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
}); | |||||
await receivedCollection.BulkWriteAsync(new[] | |||||
{ | |||||
new DeleteManyModel<CapReceivedMessage>(Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
}); | |||||
await context.WaitAsync(_waitingInterval); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,176 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using DotNetCore.CAP.Dashboard; | |||||
using DotNetCore.CAP.Dashboard.Monitoring; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBMonitoringApi : IMonitoringApi | |||||
{ | |||||
private IMongoClient _client; | |||||
private MongoDBOptions _options; | |||||
private IMongoDatabase _database; | |||||
public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) | |||||
{ | |||||
_client = client ?? throw new ArgumentNullException(nameof(client)); | |||||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||||
_database = _client.GetDatabase(_options.Database); | |||||
} | |||||
public StatisticsDto GetStatistics() | |||||
{ | |||||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var statistics = new StatisticsDto(); | |||||
{ | |||||
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count)) | |||||
statistics.PublishedSucceeded = count; | |||||
} | |||||
{ | |||||
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count)) | |||||
statistics.PublishedFailed = count; | |||||
} | |||||
{ | |||||
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count)) | |||||
statistics.ReceivedSucceeded = count; | |||||
} | |||||
{ | |||||
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count)) | |||||
statistics.ReceivedFailed = count; | |||||
} | |||||
return statistics; | |||||
} | |||||
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type) | |||||
{ | |||||
return GetHourlyTimelineStats(type, StatusName.Failed); | |||||
} | |||||
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type) | |||||
{ | |||||
return GetHourlyTimelineStats(type, StatusName.Succeeded); | |||||
} | |||||
public IList<MessageDto> Messages(MessageQueryDto queryDto) | |||||
{ | |||||
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); | |||||
var name = queryDto.MessageType == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; | |||||
var collection = _database.GetCollection<MessageDto>(name); | |||||
var builder = Builders<MessageDto>.Filter; | |||||
var filter = builder.Empty; | |||||
if (!string.IsNullOrEmpty(queryDto.StatusName)) | |||||
{ | |||||
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); | |||||
} | |||||
if (!string.IsNullOrEmpty(queryDto.Name)) | |||||
{ | |||||
filter = filter & builder.Eq(x => x.Name, queryDto.Name); | |||||
} | |||||
if (!string.IsNullOrEmpty(queryDto.Group)) | |||||
{ | |||||
filter = filter & builder.Eq(x => x.Group, queryDto.Group); | |||||
} | |||||
if (!string.IsNullOrEmpty(queryDto.Content)) | |||||
{ | |||||
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); | |||||
} | |||||
var result = | |||||
collection.Find(filter) | |||||
.SortByDescending(x => x.Added) | |||||
.Skip(queryDto.PageSize * queryDto.CurrentPage) | |||||
.Limit(queryDto.PageSize) | |||||
.ToList(); | |||||
return result; | |||||
} | |||||
public int PublishedFailedCount() | |||||
{ | |||||
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); | |||||
} | |||||
public int PublishedSucceededCount() | |||||
{ | |||||
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); | |||||
} | |||||
public int ReceivedFailedCount() | |||||
{ | |||||
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); | |||||
} | |||||
public int ReceivedSucceededCount() | |||||
{ | |||||
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); | |||||
} | |||||
private int GetNumberOfMessage(string collectionName, string statusName) | |||||
{ | |||||
var collection = _database.GetCollection<BsonDocument>(collectionName); | |||||
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } }); | |||||
return int.Parse(count.ToString()); | |||||
} | |||||
private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName) | |||||
{ | |||||
var collectionName = type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; | |||||
var endDate = DateTime.UtcNow; | |||||
var groupby = new BsonDocument { | |||||
{ "$group", new BsonDocument{ | |||||
{ "_id", new BsonDocument { | |||||
{ "Key", new BsonDocument { | |||||
{ "$dateToString", new BsonDocument { | |||||
{ "format", "%Y-%m-%d %H:00:00"}, | |||||
{ "date", "$Added"} | |||||
}} | |||||
}} | |||||
} | |||||
}, | |||||
{ "Count", new BsonDocument{ | |||||
{ "$sum", 1} | |||||
}} | |||||
}} | |||||
}; | |||||
var match = new BsonDocument { { "$match", new BsonDocument { | |||||
{ "Added", new BsonDocument { { "$gt", endDate.AddHours(-24) } } }, | |||||
{ "StatusName", new BsonDocument { { "$eq", statusName} } | |||||
} } } }; | |||||
var pipeline = new BsonDocument[] { match, groupby }; | |||||
var collection = _database.GetCollection<BsonDocument>(collectionName); | |||||
var result = collection.Aggregate<BsonDocument>(pipeline: pipeline).ToList(); | |||||
var dic = new Dictionary<DateTime, int>(); | |||||
for (var i = 0; i < 24; i++) | |||||
{ | |||||
dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0); | |||||
endDate = endDate.AddHours(-1); | |||||
} | |||||
result.ForEach(d => | |||||
{ | |||||
var key = d["_id"].AsBsonDocument["Key"].AsString; | |||||
if (DateTime.TryParse(key, out var dateTime)) | |||||
{ | |||||
dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; | |||||
} | |||||
}); | |||||
return dic; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,69 @@ | |||||
using System.Linq; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Dashboard; | |||||
using Microsoft.Extensions.Logging; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBStorage : IStorage | |||||
{ | |||||
private readonly CapOptions _capOptions; | |||||
private readonly MongoDBOptions _options; | |||||
private readonly IMongoClient _client; | |||||
private readonly ILogger<MongoDBStorage> _logger; | |||||
public MongoDBStorage(CapOptions capOptions, | |||||
MongoDBOptions options, | |||||
IMongoClient client, | |||||
ILogger<MongoDBStorage> logger) | |||||
{ | |||||
_capOptions = capOptions; | |||||
_options = options; | |||||
_client = client; | |||||
_logger = logger; | |||||
} | |||||
public IStorageConnection GetConnection() | |||||
{ | |||||
return new MongoDBStorageConnection(_capOptions, _options, _client); | |||||
} | |||||
public IMonitoringApi GetMonitoringApi() | |||||
{ | |||||
return new MongoDBMonitoringApi(_client, _options); | |||||
} | |||||
public async Task InitializeAsync(CancellationToken cancellationToken) | |||||
{ | |||||
if (cancellationToken.IsCancellationRequested) | |||||
{ | |||||
return; | |||||
} | |||||
var database = _client.GetDatabase(_options.Database); | |||||
var names = (await database.ListCollectionNamesAsync())?.ToList(); | |||||
if (!names.Any(n => n == _options.ReceivedCollection)) | |||||
{ | |||||
await database.CreateCollectionAsync(_options.ReceivedCollection); | |||||
} | |||||
if (!names.Any(n => n == _options.PublishedCollection)) | |||||
{ | |||||
await database.CreateCollectionAsync(_options.PublishedCollection); | |||||
} | |||||
if (!names.Any(n => n == "Counter")) | |||||
{ | |||||
await database.CreateCollectionAsync("Counter"); | |||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
await collection.InsertManyAsync(new BsonDocument[] | |||||
{ | |||||
new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, | |||||
new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}} | |||||
}); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,118 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBStorageConnection : IStorageConnection | |||||
{ | |||||
private CapOptions _capOptions; | |||||
private MongoDBOptions _options; | |||||
private readonly IMongoClient _client; | |||||
private readonly IMongoDatabase _database; | |||||
public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client) | |||||
{ | |||||
_capOptions = capOptions; | |||||
_options = options; | |||||
_client = client; | |||||
_database = _client.GetDatabase(_options.Database); | |||||
} | |||||
public bool ChangePublishedState(int messageId, string state) | |||||
{ | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var updateDef = Builders<CapPublishedMessage> | |||||
.Update.Inc(x => x.Retries, 1) | |||||
.Set(x => x.ExpiresAt, null) | |||||
.Set(x => x.StatusName, state); | |||||
var result = | |||||
collection.UpdateOne(x => x.Id == messageId, updateDef); | |||||
return result.ModifiedCount > 0; | |||||
} | |||||
public bool ChangeReceivedState(int messageId, string state) | |||||
{ | |||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var updateDef = Builders<CapReceivedMessage> | |||||
.Update.Inc(x => x.Retries, 1) | |||||
.Set(x => x.ExpiresAt, null) | |||||
.Set(x => x.StatusName, state); | |||||
var result = | |||||
collection.UpdateOne(x => x.Id == messageId, updateDef); | |||||
return result.ModifiedCount > 0; | |||||
} | |||||
public IStorageTransaction CreateTransaction() | |||||
{ | |||||
return new MongoDBStorageTransaction(_client, _options); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
} | |||||
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id) | |||||
{ | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); | |||||
} | |||||
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry() | |||||
{ | |||||
var fourMinsAgo = DateTime.Now.AddMinutes(-4); | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
return await | |||||
collection.Find(x => | |||||
x.Retries < _capOptions.FailedRetryCount | |||||
&& x.Added < fourMinsAgo | |||||
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) | |||||
.Limit(200) | |||||
.ToListAsync(); | |||||
} | |||||
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id) | |||||
{ | |||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); | |||||
} | |||||
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry() | |||||
{ | |||||
var fourMinsAgo = DateTime.Now.AddMinutes(-4); | |||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
return await | |||||
collection.Find(x => | |||||
x.Retries < _capOptions.FailedRetryCount | |||||
&& x.Added < fourMinsAgo | |||||
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled) | |||||
).Limit(200).ToListAsync(); | |||||
} | |||||
public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message) | |||||
{ | |||||
if (message == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(message)); | |||||
} | |||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection); | |||||
collection.InsertOne(message); | |||||
return message.Id; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,70 @@ | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Models; | |||||
using MongoDB.Driver; | |||||
using System; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
internal class MongoDBStorageTransaction : IStorageTransaction | |||||
{ | |||||
private IMongoClient _client; | |||||
private readonly MongoDBOptions _options; | |||||
private readonly IMongoDatabase _database; | |||||
private readonly IClientSessionHandle _session; | |||||
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) | |||||
{ | |||||
_client = client; | |||||
_options = options; | |||||
_database = client.GetDatabase(options.Database); | |||||
_session = _client.StartSession(); | |||||
_session.StartTransaction(); | |||||
} | |||||
public async Task CommitAsync() | |||||
{ | |||||
await _session.CommitTransactionAsync(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
_session.Dispose(); | |||||
} | |||||
public void UpdateMessage(CapPublishedMessage message) | |||||
{ | |||||
if (message == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(message)); | |||||
} | |||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var updateDef = Builders<CapPublishedMessage>.Update | |||||
.Set(x => x.Retries, message.Retries) | |||||
.Set(x => x.Content, message.Content) | |||||
.Set(x => x.ExpiresAt, message.ExpiresAt) | |||||
.Set(x => x.StatusName, message.StatusName); | |||||
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); | |||||
} | |||||
public void UpdateMessage(CapReceivedMessage message) | |||||
{ | |||||
if (message == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(message)); | |||||
} | |||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var updateDef = Builders<CapReceivedMessage>.Update | |||||
.Set(x => x.Retries, message.Retries) | |||||
.Set(x => x.Content, message.Content) | |||||
.Set(x => x.ExpiresAt, message.ExpiresAt) | |||||
.Set(x => x.StatusName, message.StatusName); | |||||
collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,63 @@ | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
namespace DotNetCore.CAP.MongoDB | |||||
{ | |||||
public class MongoDBUtil | |||||
{ | |||||
FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>() | |||||
{ | |||||
ReturnDocument = ReturnDocument.After | |||||
}; | |||||
public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, IClientSessionHandle session = null) | |||||
{ | |||||
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm | |||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | |||||
var filter = new BsonDocument { { "_id", collectionName } }; | |||||
BsonDocument result; | |||||
if (session == null) | |||||
{ | |||||
result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options); | |||||
} | |||||
else | |||||
{ | |||||
result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options); | |||||
} | |||||
if (result.TryGetValue("sequence_value", out var value)) | |||||
{ | |||||
return value.ToInt32(); | |||||
} | |||||
throw new Exception("Unable to get next sequence value."); | |||||
} | |||||
public int GetNextSequenceValue(IMongoDatabase database, string collectionName, IClientSessionHandle session = null) | |||||
{ | |||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
var filter = new BsonDocument { { "_id", collectionName } }; | |||||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | |||||
BsonDocument result; | |||||
if (session == null) | |||||
{ | |||||
result = collection.FindOneAndUpdate(filter, updateDef, _options); | |||||
} | |||||
else | |||||
{ | |||||
result = collection.FindOneAndUpdate(session, filter, updateDef, _options); | |||||
} | |||||
if (result.TryGetValue("sequence_value", out var value)) | |||||
{ | |||||
return value.ToInt32(); | |||||
} | |||||
throw new Exception("Unable to get next sequence value."); | |||||
} | |||||
} | |||||
} |
@@ -15,11 +15,11 @@ namespace DotNetCore.CAP.Abstractions | |||||
public abstract class CapPublisherBase : ICapPublisher, IDisposable | public abstract class CapPublisherBase : ICapPublisher, IDisposable | ||||
{ | { | ||||
private readonly IDispatcher _dispatcher; | private readonly IDispatcher _dispatcher; | ||||
private readonly ILogger _logger; | |||||
protected readonly ILogger _logger; | |||||
// diagnostics listener | // diagnostics listener | ||||
// ReSharper disable once InconsistentNaming | // ReSharper disable once InconsistentNaming | ||||
private static readonly DiagnosticListener s_diagnosticListener = | |||||
protected static readonly DiagnosticListener s_diagnosticListener = | |||||
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); | new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); | ||||
protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher) | protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher) | ||||
@@ -67,6 +67,16 @@ namespace DotNetCore.CAP.Abstractions | |||||
return PublishWithTransAsync(name, contentObj, callbackName); | return PublishWithTransAsync(name, contentObj, callbackName); | ||||
} | } | ||||
public virtual void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
throw new NotImplementedException("Work for MongoDB only."); | |||||
} | |||||
public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
throw new NotImplementedException("Work for MongoDB only."); | |||||
} | |||||
protected void Enqueue(CapPublishedMessage message) | protected void Enqueue(CapPublishedMessage message) | ||||
{ | { | ||||
_dispatcher.EnqueueToPublish(message); | _dispatcher.EnqueueToPublish(message); | ||||
@@ -205,7 +215,7 @@ namespace DotNetCore.CAP.Abstractions | |||||
try | try | ||||
{ | { | ||||
Console.WriteLine("================22222222222222====================="); | Console.WriteLine("================22222222222222====================="); | ||||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||||
var id = Execute(DbConnection, DbTransaction, message); | var id = Execute(DbConnection, DbTransaction, message); | ||||
Console.WriteLine("================777777777777777777777====================="); | Console.WriteLine("================777777777777777777777====================="); | ||||
@@ -12,7 +12,7 @@ namespace DotNetCore.CAP | |||||
public interface ICapPublisher | public interface ICapPublisher | ||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// (EntityFramework) Asynchronous publish a object message. | |||||
/// (EntityFramework) Asynchronous publish an object message. | |||||
/// <para> | /// <para> | ||||
/// If you are using the EntityFramework, you need to configure the DbContextType first. | /// If you are using the EntityFramework, you need to configure the DbContextType first. | ||||
/// otherwise you need to use overloaded method with IDbTransaction. | /// otherwise you need to use overloaded method with IDbTransaction. | ||||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP | |||||
Task PublishAsync<T>(string name, T contentObj, string callbackName = null); | Task PublishAsync<T>(string name, T contentObj, string callbackName = null); | ||||
/// <summary> | /// <summary> | ||||
/// (EntityFramework) Publish a object message. | |||||
/// (EntityFramework) Publish an object message. | |||||
/// <para> | /// <para> | ||||
/// If you are using the EntityFramework, you need to configure the DbContextType first. | /// If you are using the EntityFramework, you need to configure the DbContextType first. | ||||
/// otherwise you need to use overloaded method with IDbTransaction. | /// otherwise you need to use overloaded method with IDbTransaction. | ||||
@@ -38,7 +38,7 @@ namespace DotNetCore.CAP | |||||
void Publish<T>(string name, T contentObj, string callbackName = null); | void Publish<T>(string name, T contentObj, string callbackName = null); | ||||
/// <summary> | /// <summary> | ||||
/// (ado.net) Asynchronous publish a object message. | |||||
/// (ado.net) Asynchronous publish an object message. | |||||
/// </summary> | /// </summary> | ||||
/// <param name="name">the topic name or exchange router key.</param> | /// <param name="name">the topic name or exchange router key.</param> | ||||
/// <param name="contentObj">message body content, that will be serialized of json.</param> | /// <param name="contentObj">message body content, that will be serialized of json.</param> | ||||
@@ -47,12 +47,30 @@ namespace DotNetCore.CAP | |||||
Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); | Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); | ||||
/// <summary> | /// <summary> | ||||
/// (ado.net) Publish a object message. | |||||
/// (ado.net) Publish an object message. | |||||
/// </summary> | /// </summary> | ||||
/// <param name="name">the topic name or exchange router key.</param> | /// <param name="name">the topic name or exchange router key.</param> | ||||
/// <param name="contentObj">message body content, that will be serialized of json.</param> | /// <param name="contentObj">message body content, that will be serialized of json.</param> | ||||
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param> | /// <param name="dbTransaction">the transaction of <see cref="IDbTransaction" /></param> | ||||
/// <param name="callbackName">callback subscriber name</param> | /// <param name="callbackName">callback subscriber name</param> | ||||
void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); | void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); | ||||
/// <summary> | |||||
/// Publish an object message with mongo. | |||||
/// </summary> | |||||
/// <param name="name">the topic name or exchange router key.</param> | |||||
/// <param name="contentObj">message body content, that will be serialized of json.</param> | |||||
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param> | |||||
/// <param name="callbackName">callback subscriber name</param> | |||||
void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null); | |||||
/// <summary> | |||||
/// Asynchronous publish an object message with mongo. | |||||
/// </summary> | |||||
/// <param name="name">the topic name or exchange router key.</param> | |||||
/// <param name="contentObj">message body content, that will be serialized of json.</param> | |||||
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param> | |||||
/// <param name="callbackName">callback subscriber name</param> | |||||
Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null); | |||||
} | } | ||||
} | } |
@@ -11,5 +11,17 @@ namespace DotNetCore.CAP.Infrastructure | |||||
public const string Scheduled = nameof(Scheduled); | public const string Scheduled = nameof(Scheduled); | ||||
public const string Succeeded = nameof(Succeeded); | public const string Succeeded = nameof(Succeeded); | ||||
public const string Failed = nameof(Failed); | public const string Failed = nameof(Failed); | ||||
public static string Standardized(string input) | |||||
{ | |||||
foreach (var item in typeof(StatusName).GetFields()) | |||||
{ | |||||
if (item.Name.ToLower() == input.ToLower()) | |||||
{ | |||||
return item.Name; | |||||
} | |||||
} | |||||
return string.Empty; | |||||
} | |||||
} | } | ||||
} | } |
@@ -0,0 +1,7 @@ | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
public class ConnectionUtil | |||||
{ | |||||
public static string ConnectionString = "mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=my-mongo-set"; | |||||
} | |||||
} |
@@ -0,0 +1,23 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="FluentAssertions" Version="5.4.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.1" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.0" /> | |||||
<PackageReference Include="xunit" Version="2.3.1" /> | |||||
<PackageReference Include="Xunit.Priority" Version="1.0.10" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | |||||
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,83 @@ | |||||
using MongoDB.Driver; | |||||
using DotNetCore.CAP.MongoDB; | |||||
using Xunit; | |||||
using System; | |||||
using DotNetCore.CAP.Models; | |||||
using FluentAssertions; | |||||
using DotNetCore.CAP.Dashboard.Monitoring; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using System.Linq; | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
public class MongoDBMonitoringApiTest | |||||
{ | |||||
private MongoClient _client; | |||||
private MongoDBOptions _options; | |||||
private MongoDBMonitoringApi _api; | |||||
public MongoDBMonitoringApiTest() | |||||
{ | |||||
_client = new MongoClient(ConnectionUtil.ConnectionString); | |||||
_options = new MongoDBOptions(); | |||||
_api = new MongoDBMonitoringApi(_client, _options); | |||||
Init(); | |||||
} | |||||
private void Init() | |||||
{ | |||||
var helper = new MongoDBUtil(); | |||||
var database = _client.GetDatabase(_options.Database); | |||||
var collection = database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
collection.InsertMany(new CapPublishedMessage[] | |||||
{ | |||||
new CapPublishedMessage | |||||
{ | |||||
Id = helper.GetNextSequenceValue(database,_options.PublishedCollection), | |||||
Added = DateTime.Now.AddHours(-1), | |||||
StatusName = "Failed", | |||||
Content = "abc" | |||||
}, | |||||
new CapPublishedMessage | |||||
{ | |||||
Id = helper.GetNextSequenceValue(database,_options.PublishedCollection), | |||||
Added = DateTime.Now, | |||||
StatusName = "Failed", | |||||
Content = "bbc" | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void HourlyFailedJobs_Test() | |||||
{ | |||||
var result = _api.HourlyFailedJobs(MessageType.Publish); | |||||
result.Should().HaveCount(24); | |||||
} | |||||
[Fact] | |||||
public void Messages_Test() | |||||
{ | |||||
var messages = | |||||
_api.Messages(new MessageQueryDto | |||||
{ | |||||
MessageType = MessageType.Publish, | |||||
StatusName = StatusName.Failed, | |||||
Content = "b", | |||||
CurrentPage = 1, | |||||
PageSize = 1 | |||||
}); | |||||
messages.Should().HaveCount(1); | |||||
messages.First().Content.Should().Contain("b"); | |||||
} | |||||
[Fact] | |||||
public void PublishedFailedCount_Test() | |||||
{ | |||||
var count = _api.PublishedFailedCount(); | |||||
count.Should().BeGreaterThan(1); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,67 @@ | |||||
using System.Threading; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using FluentAssertions; | |||||
using Microsoft.Extensions.Logging.Abstractions; | |||||
using MongoDB.Driver; | |||||
using Xunit; | |||||
using Xunit.Priority; | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
[TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)] | |||||
public class MongoDBStorageConnectionTest | |||||
{ | |||||
private MongoClient _client; | |||||
private MongoDBOptions _options; | |||||
private MongoDBStorage _storage; | |||||
private IStorageConnection _connection; | |||||
public MongoDBStorageConnectionTest() | |||||
{ | |||||
_client = new MongoClient(ConnectionUtil.ConnectionString); | |||||
_options = new MongoDBOptions(); | |||||
_storage = new MongoDBStorage(new CapOptions(), _options, _client, NullLogger<MongoDBStorage>.Instance); | |||||
_connection = _storage.GetConnection(); | |||||
} | |||||
[Fact, Priority(1)] | |||||
public async void StoreReceivedMessageAsync_TestAsync() | |||||
{ | |||||
await _storage.InitializeAsync(default(CancellationToken)); | |||||
var id = await | |||||
_connection.StoreReceivedMessageAsync(new CapReceivedMessage(new MessageContext | |||||
{ | |||||
Group = "test", | |||||
Name = "test", | |||||
Content = "test-content" | |||||
})); | |||||
id.Should().BeGreaterThan(0); | |||||
} | |||||
[Fact, Priority(2)] | |||||
public void ChangeReceivedState_Test() | |||||
{ | |||||
var collection = _client.GetDatabase(_options.Database).GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var msg = collection.Find(x => true).FirstOrDefault(); | |||||
_connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue(); | |||||
collection.Find(x => x.Id == msg.Id).FirstOrDefault()?.StatusName.Should().Be(StatusName.Scheduled); | |||||
} | |||||
[Fact, Priority(3)] | |||||
public async void GetReceivedMessagesOfNeedRetry_TestAsync() | |||||
{ | |||||
var msgs = await _connection.GetReceivedMessagesOfNeedRetry(); | |||||
msgs.Should().HaveCountGreaterThan(0); | |||||
} | |||||
[Fact, Priority(4)] | |||||
public void GetReceivedMessageAsync_Test() | |||||
{ | |||||
var msg = _connection.GetReceivedMessageAsync(1); | |||||
msg.Should().NotBeNull(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
using System.Threading; | |||||
using FluentAssertions; | |||||
using Microsoft.Extensions.Logging.Abstractions; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
public class MongoDBStorageTest | |||||
{ | |||||
private MongoClient _client; | |||||
public MongoDBStorageTest() | |||||
{ | |||||
_client = new MongoClient(ConnectionUtil.ConnectionString); | |||||
} | |||||
[Fact] | |||||
public async void InitializeAsync_Test() | |||||
{ | |||||
var options = new MongoDBOptions(); | |||||
var storage = new MongoDBStorage(new CapOptions(), options, _client, NullLogger<MongoDBStorage>.Instance); | |||||
await storage.InitializeAsync(default(CancellationToken)); | |||||
var names = _client.ListDatabaseNames()?.ToList(); | |||||
names.Should().Contain(options.Database); | |||||
var collections = _client.GetDatabase(options.Database).ListCollectionNames()?.ToList(); | |||||
collections.Should().Contain(options.PublishedCollection); | |||||
collections.Should().Contain(options.ReceivedCollection); | |||||
collections.Should().Contain("Counter"); | |||||
var collection = _client.GetDatabase(options.Database).GetCollection<BsonDocument>("Counter"); | |||||
collection.CountDocuments(new BsonDocument { { "_id", options.PublishedCollection } }).Should().Be(1); | |||||
collection.CountDocuments(new BsonDocument { { "_id", options.ReceivedCollection } }).Should().Be(1); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,82 @@ | |||||
using System; | |||||
using FluentAssertions; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
public class MongoDBTest | |||||
{ | |||||
private MongoClient _client; | |||||
public MongoDBTest() | |||||
{ | |||||
_client = new MongoClient(ConnectionUtil.ConnectionString); | |||||
} | |||||
[Fact] | |||||
public void MongoDB_Connection_Test() | |||||
{ | |||||
var names = _client.ListDatabaseNames(); | |||||
names.ToList().Should().NotBeNullOrEmpty(); | |||||
} | |||||
[Fact] | |||||
public void Transaction_Test() | |||||
{ | |||||
var document = new BsonDocument | |||||
{ | |||||
{ "name", "MongoDB" }, | |||||
{ "type", "Database" }, | |||||
{ "count", 1 }, | |||||
{ "info", new BsonDocument | |||||
{ | |||||
{ "x", 203 }, | |||||
{ "y", 102 } | |||||
}} | |||||
}; | |||||
var db = _client.GetDatabase("test"); | |||||
var collection1 = db.GetCollection<BsonDocument>("test1"); | |||||
var collection2 = db.GetCollection<BsonDocument>("test2"); | |||||
using (var sesstion = _client.StartSession()) | |||||
{ | |||||
sesstion.StartTransaction(); | |||||
collection1.InsertOne(document); | |||||
collection2.InsertOne(document); | |||||
sesstion.CommitTransaction(); | |||||
} | |||||
var filter = new BsonDocument("name", "MongoDB"); | |||||
collection1.CountDocuments(filter).Should().BeGreaterThan(0); | |||||
collection2.CountDocuments(filter).Should().BeGreaterThan(0); | |||||
} | |||||
[Fact] | |||||
public void Transaction_Rollback_Test() | |||||
{ | |||||
var document = new BsonDocument | |||||
{ | |||||
{"name", "MongoDB"}, | |||||
{"date", DateTimeOffset.Now.ToString()} | |||||
}; | |||||
var db = _client.GetDatabase("test"); | |||||
var collection = db.GetCollection<BsonDocument>("test3"); | |||||
var collection4 = db.GetCollection<BsonDocument>("test4"); | |||||
using (var session = _client.StartSession()) | |||||
{ | |||||
session.IsInTransaction.Should().BeFalse(); | |||||
session.StartTransaction(); | |||||
session.IsInTransaction.Should().BeTrue(); | |||||
collection.InsertOne(session, document); | |||||
collection4.InsertOne(session, new BsonDocument { { "name", "MongoDB" } }); | |||||
session.AbortTransaction(); | |||||
} | |||||
var filter = new BsonDocument("name", "MongoDB"); | |||||
collection.CountDocuments(filter).Should().Be(0); | |||||
collection4.CountDocuments(filter).Should().Be(0); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,50 @@ | |||||
using System.Collections.Concurrent; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Models; | |||||
using FluentAssertions; | |||||
using MongoDB.Bson; | |||||
using MongoDB.Driver; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.MongoDB.Test | |||||
{ | |||||
public class MongoDBUtilTest | |||||
{ | |||||
private readonly MongoClient _client; | |||||
private readonly IMongoDatabase _database; | |||||
string _recieved = "ReceivedTest"; | |||||
public MongoDBUtilTest() | |||||
{ | |||||
_client = new MongoClient(ConnectionUtil.ConnectionString); | |||||
_database = _client.GetDatabase("CAP_Test"); | |||||
//Initialize MongoDB | |||||
if (!_database.ListCollectionNames().ToList().Any(x => x == "Counter")) | |||||
{ | |||||
var collection = _database.GetCollection<BsonDocument>("Counter"); | |||||
collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } }); | |||||
} | |||||
} | |||||
[Fact] | |||||
public async void GetNextSequenceValueAsync_Test() | |||||
{ | |||||
var id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _recieved); | |||||
id.Should().BeGreaterThan(0); | |||||
} | |||||
[Fact] | |||||
public void GetNextSequenceValue_Concurrency_Test() | |||||
{ | |||||
var dic = new ConcurrentDictionary<int, int>(); | |||||
Parallel.For(0, 30, (x) => | |||||
{ | |||||
var id = new MongoDBUtil().GetNextSequenceValue(_database, _recieved); | |||||
id.Should().BeGreaterThan(0); | |||||
dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same. | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -15,13 +15,13 @@ namespace DotNetCore.CAP.Test | |||||
public void CanCreateInstanceAndGetService() | public void CanCreateInstanceAndGetService() | ||||
{ | { | ||||
var services = new ServiceCollection(); | var services = new ServiceCollection(); | ||||
services.AddSingleton<ICapPublisher, MyProducerService>(); | services.AddSingleton<ICapPublisher, MyProducerService>(); | ||||
var builder = new CapBuilder(services); | var builder = new CapBuilder(services); | ||||
Assert.NotNull(builder); | Assert.NotNull(builder); | ||||
var count = builder.Services.Count; | var count = builder.Services.Count; | ||||
Assert.Equal(1, count); | |||||
Assert.Equal(1, count); | |||||
var provider = services.BuildServiceProvider(); | var provider = services.BuildServiceProvider(); | ||||
var capPublisher = provider.GetService<ICapPublisher>(); | var capPublisher = provider.GetService<ICapPublisher>(); | ||||
@@ -129,6 +129,11 @@ namespace DotNetCore.CAP.Test | |||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
public void Publish<T>(string name, T contentObj, object mongoSession, string callbackName = null) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task PublishAsync(string topic, string content) | public Task PublishAsync(string topic, string content) | ||||
{ | { | ||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
@@ -163,6 +168,16 @@ namespace DotNetCore.CAP.Test | |||||
{ | { | ||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
public void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } |