diff --git a/.gitignore b/.gitignore index af4f8ef..27ab035 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,5 @@ Properties /src/DotNetCore.CAP/packages.config /src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj /NuGet.config +.vscode/* +samples/Sample.RabbitMQ.MongoDB/appsettings.Development.json diff --git a/CAP.sln b/CAP.sln index 225537c..bf7efad 100644 --- a/CAP.sln +++ b/CAP.sln @@ -60,6 +60,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql.T EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB.Test", "test\DotNetCore.CAP.MongoDB.Test\DotNetCore.CAP.MongoDB.Test.csproj", "{C143FCDF-E7F3-46F8-987E-A1BA38C1639D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MongoDB", "src\DotNetCore.CAP.MongoDB\DotNetCore.CAP.MongoDB.csproj", "{77C0AC02-C44B-49D5-B969-7D5305FC20A5}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -113,6 +119,18 @@ Global {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Debug|Any CPU.Build.0 = Debug|Any CPU {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.ActiveCfg = Release|Any CPU {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91}.Release|Any CPU.Build.0 = Release|Any CPU + {C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C143FCDF-E7F3-46F8-987E-A1BA38C1639D}.Release|Any CPU.Build.0 = Release|Any CPU + {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU + {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -130,6 +148,9 @@ Global {82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {7CA3625D-1817-4695-881D-7E79A1E1DED2} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {9CB51105-A85B-42A4-AFDE-A4FC34D9EA91} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} + {77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs new file mode 100644 index 0000000..374100d --- /dev/null +++ b/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP; +using Microsoft.AspNetCore.Mvc; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace Sample.RabbitMQ.MongoDB.Controllers +{ + [Route("api/[controller]")] + [ApiController] + public class ValuesController : ControllerBase + { + private readonly IMongoClient _client; + private readonly ICapPublisher _capPublisher; + + public ValuesController(IMongoClient client, ICapPublisher capPublisher) + { + _client = client; + _capPublisher = capPublisher; + } + + [Route("~/publish")] + public IActionResult PublishWithSession() + { + using (var session = _client.StartSession()) + { + session.StartTransaction(); + var collection = _client.GetDatabase("TEST").GetCollection("test"); + collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); + + _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session); + + session.CommitTransaction(); + } + return Ok(); + } + + [Route("~/publish_rollback")] + public IActionResult PublishRollback() + { + using (var session = _client.StartSession()) + { + try + { + session.StartTransaction(); + _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session); + throw new Exception("Foo"); + } + catch (System.Exception ex) + { + session.AbortTransaction(); + return StatusCode(500, ex.Message); + } + } + } + + [Route("~/publish_without_session")] + public IActionResult PublishWithoutSession() + { + _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now); + return Ok(); + } + + [NonAction] + [CapSubscribe("sample.rabbitmq.mongodb")] + public void ReceiveMessage(DateTime time) + { + Console.WriteLine("[sample.rabbitmq.mongodb] message received: " + DateTime.Now + ",sent time: " + time); + } + } +} diff --git a/samples/Sample.RabbitMQ.MongoDB/Program.cs b/samples/Sample.RabbitMQ.MongoDB/Program.cs new file mode 100644 index 0000000..7f32066 --- /dev/null +++ b/samples/Sample.RabbitMQ.MongoDB/Program.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +namespace Sample.RabbitMQ.MongoDB +{ + public class Program + { + public static void Main(string[] args) + { + CreateWebHostBuilder(args).Build().Run(); + } + + public static IWebHostBuilder CreateWebHostBuilder(string[] args) => + WebHost.CreateDefaultBuilder(args) + .UseStartup(); + } +} diff --git a/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj b/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj new file mode 100644 index 0000000..fdaa849 --- /dev/null +++ b/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj @@ -0,0 +1,21 @@ + + + + netcoreapp2.1 + + + + + + + + + + + + + + + + + diff --git a/samples/Sample.RabbitMQ.MongoDB/Startup.cs b/samples/Sample.RabbitMQ.MongoDB/Startup.cs new file mode 100644 index 0000000..4f4bd69 --- /dev/null +++ b/samples/Sample.RabbitMQ.MongoDB/Startup.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.HttpsPolicy; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; + +namespace Sample.RabbitMQ.MongoDB +{ + public class Startup + { + public Startup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + services.AddSingleton(new MongoClient(Configuration.GetConnectionString("MongoDB"))); + services.AddCap(x => + { + x.UseMongoDB(); + + var mq = new RabbitMQOptions(); + Configuration.GetSection("RabbitMQ").Bind(mq); + x.UseRabbitMQ(cfg => + { + cfg.HostName = mq.HostName; + cfg.Port = mq.Port; + cfg.UserName = mq.UserName; + cfg.Password = mq.Password; + }); + + x.UseDashboard(); + }); + services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); + } + + public void Configure(IApplicationBuilder app, IHostingEnvironment env) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + app.UseMvc(); + + app.UseCap(); + } + } +} diff --git a/samples/Sample.RabbitMQ.MongoDB/appsettings.json b/samples/Sample.RabbitMQ.MongoDB/appsettings.json new file mode 100644 index 0000000..5707bfb --- /dev/null +++ b/samples/Sample.RabbitMQ.MongoDB/appsettings.json @@ -0,0 +1,17 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "MongoDB": "mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0" + }, + "RabbitMQ": { + "HostName": "localhost", + "Port": 5672, + "UserName": "", + "Password": "" + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs new file mode 100644 index 0000000..3bbb891 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs @@ -0,0 +1,31 @@ +using System; +using DotNetCore.CAP; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBCapOptionsExtension : ICapOptionsExtension + { + private Action _configure; + + public MongoDBCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddTransient(); + + var options = new MongoDBOptions(); + _configure?.Invoke(options); + services.AddSingleton(options); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs new file mode 100644 index 0000000..4e6ef3a --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs @@ -0,0 +1,19 @@ +using System; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBOptions + { + public const string DefaultDatabase = "Cap"; + + /// + /// Gets or sets the database to use when creating database objects. + /// Default is . + /// + public string Database { get; set; } = DefaultDatabase; + + public string ReceivedCollection { get; } = "Received"; + + public string PublishedCollection { get; } = "Published"; + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs new file mode 100644 index 0000000..af69c02 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs @@ -0,0 +1,26 @@ +using System; +using DotNetCore.CAP; +using DotNetCore.CAP.MongoDB; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UseMongoDB(this CapOptions options) + { + return options.UseMongoDB(x => { }); + } + + public static CapOptions UseMongoDB(this CapOptions options, Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + options.RegisterExtension(new MongoDBCapOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CapPublisher.cs b/src/DotNetCore.CAP.MongoDB/CapPublisher.cs new file mode 100644 index 0000000..1cda633 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/CapPublisher.cs @@ -0,0 +1,178 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Diagnostics; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class CapPublisher : CapPublisherBase, ICallbackPublisher + { + private readonly IMongoClient _client; + private readonly MongoDBOptions _options; + private readonly IMongoDatabase _database; + private bool _isInTransaction = true; + + public CapPublisher(ILogger logger, IDispatcher dispatcher, + IMongoClient client, MongoDBOptions options, IServiceProvider provider) + : base(logger, dispatcher) + { + _client = client; + _options = options; + _database = client.GetDatabase(_options.Database); + ServiceProvider = provider; + } + + public async Task PublishCallbackAsync(CapPublishedMessage message) + { + var collection = _database.GetCollection(_options.PublishedCollection); + message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection); + collection.InsertOne(message); + Enqueue(message); + } + + protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) + { + throw new System.NotImplementedException("Not work for MongoDB"); + } + + protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) + { + throw new System.NotImplementedException("Not work for MongoDB"); + } + + protected override void PrepareConnectionForEF() + { + throw new System.NotImplementedException("Not work for MongoDB"); + } + + public override void PublishWithMongo(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + var session = mongoSession as IClientSessionHandle; + if (session == null) + { + _isInTransaction = false; + } + + PublishWithSession(name, contentObj, session, callbackName); + } + + public override async Task PublishWithMongoAsync(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + var session = mongoSession as IClientSessionHandle; + if (session == null) + { + _isInTransaction = false; + } + + await PublishWithSessionAsync(name, contentObj, session, callbackName); + } + + private void PublishWithSession(string name, T contentObj, IClientSessionHandle session, string callbackName) + { + Guid operationId = default(Guid); + + var content = Serialize(contentObj, callbackName); + + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + + try + { + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); + var id = Execute(session, message); + + if (!_isInTransaction && id > 0) + { + _logger.LogInformation($"message [{message}] has been persisted in the database."); + s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); + message.Id = id; + Enqueue(message); + } + } + catch (System.Exception e) + { + _logger.LogError(e, "An exception was occurred when publish message. message:" + name); + s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); + throw; + } + } + + private int Execute(IClientSessionHandle session, CapPublishedMessage message) + { + message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); + + var collection = _database.GetCollection(_options.PublishedCollection); + if (_isInTransaction) + { + collection.InsertOne(session, message); + } + else + { + collection.InsertOne(message); + } + return message.Id; + } + + + private async Task PublishWithSessionAsync(string name, T contentObj, IClientSessionHandle session, string callbackName) + { + Guid operationId = default(Guid); + var content = Serialize(contentObj, callbackName); + + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + + try + { + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); + + var id = await ExecuteAsync(session, message); + + if (!_isInTransaction && id > 0) + { + _logger.LogInformation($"message [{message}] has been persisted in the database."); + s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); + + message.Id = id; + + Enqueue(message); + } + } + catch (System.Exception e) + { + _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); + s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); + Console.WriteLine(e); + throw; + } + } + + private async Task ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) + { + message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); + var collection = _database.GetCollection(_options.PublishedCollection); + if (_isInTransaction) + { + await collection.InsertOneAsync(session, message); + } + else + { + await collection.InsertOneAsync(message); + } + return message.Id; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj new file mode 100644 index 0000000..ef1b0c1 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj @@ -0,0 +1,17 @@ + + + + + + + + netstandard2.0 + + + + + + + + + diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs b/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs new file mode 100644 index 0000000..a0519bc --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBCollectProcessor : ICollectProcessor + { + private readonly IMongoClient _client; + private readonly MongoDBOptions _options; + private readonly ILogger _logger; + private readonly IMongoDatabase _database; + private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); + + public MongoDBCollectProcessor(IMongoClient client, MongoDBOptions options, + ILogger logger) + { + _client = client; + _options = options; + _logger = logger; + _database = client.GetDatabase(_options.Database); + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug($"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}]."); + + var publishedCollection = _database.GetCollection(_options.PublishedCollection); + var receivedCollection = _database.GetCollection(_options.ReceivedCollection); + + await publishedCollection.BulkWriteAsync(new[] + { + new DeleteManyModel(Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) + }); + await receivedCollection.BulkWriteAsync(new[] + { + new DeleteManyModel(Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) + }); + + await context.WaitAsync(_waitingInterval); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs b/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs new file mode 100644 index 0000000..7dfcff4 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs @@ -0,0 +1,176 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.Dashboard.Monitoring; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBMonitoringApi : IMonitoringApi + { + private IMongoClient _client; + private MongoDBOptions _options; + private IMongoDatabase _database; + + public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + + _database = _client.GetDatabase(_options.Database); + } + + public StatisticsDto GetStatistics() + { + var publishedCollection = _database.GetCollection(_options.PublishedCollection); + var receivedCollection = _database.GetCollection(_options.ReceivedCollection); + + var statistics = new StatisticsDto(); + + { + if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count)) + statistics.PublishedSucceeded = count; + } + { + if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count)) + statistics.PublishedFailed = count; + } + { + if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count)) + statistics.ReceivedSucceeded = count; + } + { + if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count)) + statistics.ReceivedFailed = count; + } + + return statistics; + } + + public IDictionary HourlyFailedJobs(MessageType type) + { + return GetHourlyTimelineStats(type, StatusName.Failed); + } + + public IDictionary HourlySucceededJobs(MessageType type) + { + return GetHourlyTimelineStats(type, StatusName.Succeeded); + } + + public IList Messages(MessageQueryDto queryDto) + { + queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); + + var name = queryDto.MessageType == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; + var collection = _database.GetCollection(name); + + var builder = Builders.Filter; + var filter = builder.Empty; + if (!string.IsNullOrEmpty(queryDto.StatusName)) + { + filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); + } + if (!string.IsNullOrEmpty(queryDto.Name)) + { + filter = filter & builder.Eq(x => x.Name, queryDto.Name); + } + if (!string.IsNullOrEmpty(queryDto.Group)) + { + filter = filter & builder.Eq(x => x.Group, queryDto.Group); + } + if (!string.IsNullOrEmpty(queryDto.Content)) + { + filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); + } + + var result = + collection.Find(filter) + .SortByDescending(x => x.Added) + .Skip(queryDto.PageSize * queryDto.CurrentPage) + .Limit(queryDto.PageSize) + .ToList(); + + return result; + } + + public int PublishedFailedCount() + { + return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); + } + + public int PublishedSucceededCount() + { + return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); + } + + public int ReceivedFailedCount() + { + return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); + } + + public int ReceivedSucceededCount() + { + return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); + } + + private int GetNumberOfMessage(string collectionName, string statusName) + { + var collection = _database.GetCollection(collectionName); + var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } }); + return int.Parse(count.ToString()); + } + + private IDictionary GetHourlyTimelineStats(MessageType type, string statusName) + { + var collectionName = type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; + var endDate = DateTime.UtcNow; + + var groupby = new BsonDocument { + { "$group", new BsonDocument{ + { "_id", new BsonDocument { + { "Key", new BsonDocument { + { "$dateToString", new BsonDocument { + { "format", "%Y-%m-%d %H:00:00"}, + { "date", "$Added"} + }} + }} + } + }, + { "Count", new BsonDocument{ + { "$sum", 1} + }} + }} + }; + + var match = new BsonDocument { { "$match", new BsonDocument { + { "Added", new BsonDocument { { "$gt", endDate.AddHours(-24) } } }, + { "StatusName", new BsonDocument { { "$eq", statusName} } + } } } }; + var pipeline = new BsonDocument[] { match, groupby }; + + var collection = _database.GetCollection(collectionName); + var result = collection.Aggregate(pipeline: pipeline).ToList(); + + var dic = new Dictionary(); + for (var i = 0; i < 24; i++) + { + dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0); + endDate = endDate.AddHours(-1); + } + result.ForEach(d => + { + var key = d["_id"].AsBsonDocument["Key"].AsString; + if (DateTime.TryParse(key, out var dateTime)) + { + dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; + } + }); + + return dic; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs b/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs new file mode 100644 index 0000000..27084b8 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs @@ -0,0 +1,69 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Dashboard; +using Microsoft.Extensions.Logging; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBStorage : IStorage + { + private readonly CapOptions _capOptions; + private readonly MongoDBOptions _options; + private readonly IMongoClient _client; + private readonly ILogger _logger; + + public MongoDBStorage(CapOptions capOptions, + MongoDBOptions options, + IMongoClient client, + ILogger logger) + { + _capOptions = capOptions; + _options = options; + _client = client; + _logger = logger; + } + + public IStorageConnection GetConnection() + { + return new MongoDBStorageConnection(_capOptions, _options, _client); + } + + public IMonitoringApi GetMonitoringApi() + { + return new MongoDBMonitoringApi(_client, _options); + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + var database = _client.GetDatabase(_options.Database); + var names = (await database.ListCollectionNamesAsync())?.ToList(); + + if (!names.Any(n => n == _options.ReceivedCollection)) + { + await database.CreateCollectionAsync(_options.ReceivedCollection); + } + if (!names.Any(n => n == _options.PublishedCollection)) + { + await database.CreateCollectionAsync(_options.PublishedCollection); + } + if (!names.Any(n => n == "Counter")) + { + await database.CreateCollectionAsync("Counter"); + var collection = database.GetCollection("Counter"); + await collection.InsertManyAsync(new BsonDocument[] + { + new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, + new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}} + }); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs b/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs new file mode 100644 index 0000000..11e8d8e --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs @@ -0,0 +1,118 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBStorageConnection : IStorageConnection + { + private CapOptions _capOptions; + private MongoDBOptions _options; + private readonly IMongoClient _client; + private readonly IMongoDatabase _database; + + public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client) + { + _capOptions = capOptions; + _options = options; + _client = client; + _database = _client.GetDatabase(_options.Database); + } + + public bool ChangePublishedState(int messageId, string state) + { + var collection = _database.GetCollection(_options.PublishedCollection); + + var updateDef = Builders + .Update.Inc(x => x.Retries, 1) + .Set(x => x.ExpiresAt, null) + .Set(x => x.StatusName, state); + + var result = + collection.UpdateOne(x => x.Id == messageId, updateDef); + + return result.ModifiedCount > 0; + } + + public bool ChangeReceivedState(int messageId, string state) + { + var collection = _database.GetCollection(_options.ReceivedCollection); + + var updateDef = Builders + .Update.Inc(x => x.Retries, 1) + .Set(x => x.ExpiresAt, null) + .Set(x => x.StatusName, state); + + var result = + collection.UpdateOne(x => x.Id == messageId, updateDef); + + return result.ModifiedCount > 0; + } + + public IStorageTransaction CreateTransaction() + { + return new MongoDBStorageTransaction(_client, _options); + } + + public void Dispose() + { + } + + public async Task GetPublishedMessageAsync(int id) + { + var collection = _database.GetCollection(_options.PublishedCollection); + return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var collection = _database.GetCollection(_options.PublishedCollection); + return await + collection.Find(x => + x.Retries < _capOptions.FailedRetryCount + && x.Added < fourMinsAgo + && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) + .Limit(200) + .ToListAsync(); + } + + public async Task GetReceivedMessageAsync(int id) + { + var collection = _database.GetCollection(_options.ReceivedCollection); + return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); + } + + public async Task> GetReceivedMessagesOfNeedRetry() + { + var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var collection = _database.GetCollection(_options.ReceivedCollection); + + return await + collection.Find(x => + x.Retries < _capOptions.FailedRetryCount + && x.Added < fourMinsAgo + && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled) + ).Limit(200).ToListAsync(); + } + + public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + var collection = _database.GetCollection(_options.ReceivedCollection); + + message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection); + + collection.InsertOne(message); + + return message.Id; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs b/src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs new file mode 100644 index 0000000..5ea0a56 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs @@ -0,0 +1,70 @@ +using System.Threading.Tasks; +using DotNetCore.CAP.Models; +using MongoDB.Driver; +using System; + +namespace DotNetCore.CAP.MongoDB +{ + internal class MongoDBStorageTransaction : IStorageTransaction + { + private IMongoClient _client; + private readonly MongoDBOptions _options; + private readonly IMongoDatabase _database; + private readonly IClientSessionHandle _session; + + public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) + { + _client = client; + _options = options; + _database = client.GetDatabase(options.Database); + _session = _client.StartSession(); + _session.StartTransaction(); + } + + public async Task CommitAsync() + { + await _session.CommitTransactionAsync(); + } + + public void Dispose() + { + _session.Dispose(); + } + + public void UpdateMessage(CapPublishedMessage message) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var collection = _database.GetCollection(_options.PublishedCollection); + + var updateDef = Builders.Update + .Set(x => x.Retries, message.Retries) + .Set(x => x.Content, message.Content) + .Set(x => x.ExpiresAt, message.ExpiresAt) + .Set(x => x.StatusName, message.StatusName); + + collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); + } + + public void UpdateMessage(CapReceivedMessage message) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var collection = _database.GetCollection(_options.ReceivedCollection); + + var updateDef = Builders.Update + .Set(x => x.Retries, message.Retries) + .Set(x => x.Content, message.Content) + .Set(x => x.ExpiresAt, message.ExpiresAt) + .Set(x => x.StatusName, message.StatusName); + + collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs b/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs new file mode 100644 index 0000000..8d862ad --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs @@ -0,0 +1,63 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBUtil + { + FindOneAndUpdateOptions _options = new FindOneAndUpdateOptions() + { + ReturnDocument = ReturnDocument.After + }; + public async Task GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, IClientSessionHandle session = null) + { + //https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm + var collection = database.GetCollection("Counter"); + + var updateDef = Builders.Update.Inc("sequence_value", 1); + var filter = new BsonDocument { { "_id", collectionName } }; + + BsonDocument result; + if (session == null) + { + result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options); + } + else + { + result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options); + } + + if (result.TryGetValue("sequence_value", out var value)) + { + return value.ToInt32(); + } + throw new Exception("Unable to get next sequence value."); + } + + public int GetNextSequenceValue(IMongoDatabase database, string collectionName, IClientSessionHandle session = null) + { + var collection = database.GetCollection("Counter"); + + var filter = new BsonDocument { { "_id", collectionName } }; + var updateDef = Builders.Update.Inc("sequence_value", 1); + + BsonDocument result; + if (session == null) + { + result = collection.FindOneAndUpdate(filter, updateDef, _options); + } + else + { + result = collection.FindOneAndUpdate(session, filter, updateDef, _options); + } + + if (result.TryGetValue("sequence_value", out var value)) + { + return value.ToInt32(); + } + throw new Exception("Unable to get next sequence value."); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index 77a6c44..bc53ca8 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -15,11 +15,11 @@ namespace DotNetCore.CAP.Abstractions public abstract class CapPublisherBase : ICapPublisher, IDisposable { private readonly IDispatcher _dispatcher; - private readonly ILogger _logger; + protected readonly ILogger _logger; // diagnostics listener // ReSharper disable once InconsistentNaming - private static readonly DiagnosticListener s_diagnosticListener = + protected static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); protected CapPublisherBase(ILogger logger, IDispatcher dispatcher) @@ -67,6 +67,16 @@ namespace DotNetCore.CAP.Abstractions return PublishWithTransAsync(name, contentObj, callbackName); } + public virtual void PublishWithMongo(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + throw new NotImplementedException("Work for MongoDB only."); + } + + public virtual Task PublishWithMongoAsync(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + throw new NotImplementedException("Work for MongoDB only."); + } + protected void Enqueue(CapPublishedMessage message) { _dispatcher.EnqueueToPublish(message); @@ -205,7 +215,7 @@ namespace DotNetCore.CAP.Abstractions try { Console.WriteLine("================22222222222222====================="); - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); var id = Execute(DbConnection, DbTransaction, message); Console.WriteLine("================777777777777777777777====================="); diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index 0688c02..4386926 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -12,7 +12,7 @@ namespace DotNetCore.CAP public interface ICapPublisher { /// - /// (EntityFramework) Asynchronous publish a object message. + /// (EntityFramework) Asynchronous publish an object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbTransaction. @@ -25,7 +25,7 @@ namespace DotNetCore.CAP Task PublishAsync(string name, T contentObj, string callbackName = null); /// - /// (EntityFramework) Publish a object message. + /// (EntityFramework) Publish an object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbTransaction. @@ -38,7 +38,7 @@ namespace DotNetCore.CAP void Publish(string name, T contentObj, string callbackName = null); /// - /// (ado.net) Asynchronous publish a object message. + /// (ado.net) Asynchronous publish an object message. /// /// the topic name or exchange router key. /// message body content, that will be serialized of json. @@ -47,12 +47,30 @@ namespace DotNetCore.CAP Task PublishAsync(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); /// - /// (ado.net) Publish a object message. + /// (ado.net) Publish an object message. /// /// the topic name or exchange router key. /// message body content, that will be serialized of json. /// the transaction of /// callback subscriber name void Publish(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null); + + /// + /// Publish an object message with mongo. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// if seesion was set null, the message will be published directly. + /// callback subscriber name + void PublishWithMongo(string name, T contentObj, object mongoSession = null, string callbackName = null); + + /// + /// Asynchronous publish an object message with mongo. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// if seesion was set null, the message will be published directly. + /// callback subscriber name + Task PublishWithMongoAsync(string name, T contentObj, object mongoSession = null, string callbackName = null); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Infrastructure/StatusName.cs b/src/DotNetCore.CAP/Infrastructure/StatusName.cs index 65d9427..f8c5142 100644 --- a/src/DotNetCore.CAP/Infrastructure/StatusName.cs +++ b/src/DotNetCore.CAP/Infrastructure/StatusName.cs @@ -11,5 +11,17 @@ namespace DotNetCore.CAP.Infrastructure public const string Scheduled = nameof(Scheduled); public const string Succeeded = nameof(Succeeded); public const string Failed = nameof(Failed); + + public static string Standardized(string input) + { + foreach (var item in typeof(StatusName).GetFields()) + { + if (item.Name.ToLower() == input.ToLower()) + { + return item.Name; + } + } + return string.Empty; + } } } \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/ConnectionUtil.cs b/test/DotNetCore.CAP.MongoDB.Test/ConnectionUtil.cs new file mode 100644 index 0000000..9c8aa29 --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/ConnectionUtil.cs @@ -0,0 +1,7 @@ +namespace DotNetCore.CAP.MongoDB.Test +{ + public class ConnectionUtil + { + public static string ConnectionString = "mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=my-mongo-set"; + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj b/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj new file mode 100644 index 0000000..5ec46fc --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp2.1 + + false + + + + + + + + + + + + + + + + + diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs new file mode 100644 index 0000000..a4b6817 --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs @@ -0,0 +1,83 @@ +using MongoDB.Driver; +using DotNetCore.CAP.MongoDB; +using Xunit; +using System; +using DotNetCore.CAP.Models; +using FluentAssertions; +using DotNetCore.CAP.Dashboard.Monitoring; +using DotNetCore.CAP.Infrastructure; +using System.Linq; + +namespace DotNetCore.CAP.MongoDB.Test +{ + public class MongoDBMonitoringApiTest + { + private MongoClient _client; + private MongoDBOptions _options; + private MongoDBMonitoringApi _api; + + public MongoDBMonitoringApiTest() + { + _client = new MongoClient(ConnectionUtil.ConnectionString); + _options = new MongoDBOptions(); + _api = new MongoDBMonitoringApi(_client, _options); + + Init(); + } + + private void Init() + { + var helper = new MongoDBUtil(); + var database = _client.GetDatabase(_options.Database); + var collection = database.GetCollection(_options.PublishedCollection); + collection.InsertMany(new CapPublishedMessage[] + { + new CapPublishedMessage + { + Id = helper.GetNextSequenceValue(database,_options.PublishedCollection), + Added = DateTime.Now.AddHours(-1), + StatusName = "Failed", + Content = "abc" + }, + new CapPublishedMessage + { + Id = helper.GetNextSequenceValue(database,_options.PublishedCollection), + Added = DateTime.Now, + StatusName = "Failed", + Content = "bbc" + } + }); + } + + [Fact] + public void HourlyFailedJobs_Test() + { + var result = _api.HourlyFailedJobs(MessageType.Publish); + result.Should().HaveCount(24); + } + + [Fact] + public void Messages_Test() + { + var messages = + _api.Messages(new MessageQueryDto + { + MessageType = MessageType.Publish, + StatusName = StatusName.Failed, + Content = "b", + CurrentPage = 1, + PageSize = 1 + }); + + messages.Should().HaveCount(1); + messages.First().Content.Should().Contain("b"); + } + + [Fact] + public void PublishedFailedCount_Test() + { + var count = _api.PublishedFailedCount(); + count.Should().BeGreaterThan(1); + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs new file mode 100644 index 0000000..1c02736 --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs @@ -0,0 +1,67 @@ +using System.Threading; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using MongoDB.Driver; +using Xunit; +using Xunit.Priority; + +namespace DotNetCore.CAP.MongoDB.Test +{ + [TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)] + public class MongoDBStorageConnectionTest + { + private MongoClient _client; + private MongoDBOptions _options; + private MongoDBStorage _storage; + private IStorageConnection _connection; + + public MongoDBStorageConnectionTest() + { + _client = new MongoClient(ConnectionUtil.ConnectionString); + _options = new MongoDBOptions(); + _storage = new MongoDBStorage(new CapOptions(), _options, _client, NullLogger.Instance); + _connection = _storage.GetConnection(); + } + + [Fact, Priority(1)] + public async void StoreReceivedMessageAsync_TestAsync() + { + await _storage.InitializeAsync(default(CancellationToken)); + + var id = await + _connection.StoreReceivedMessageAsync(new CapReceivedMessage(new MessageContext + { + Group = "test", + Name = "test", + Content = "test-content" + })); + id.Should().BeGreaterThan(0); + } + + [Fact, Priority(2)] + public void ChangeReceivedState_Test() + { + var collection = _client.GetDatabase(_options.Database).GetCollection(_options.ReceivedCollection); + + var msg = collection.Find(x => true).FirstOrDefault(); + _connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue(); + collection.Find(x => x.Id == msg.Id).FirstOrDefault()?.StatusName.Should().Be(StatusName.Scheduled); + } + + [Fact, Priority(3)] + public async void GetReceivedMessagesOfNeedRetry_TestAsync() + { + var msgs = await _connection.GetReceivedMessagesOfNeedRetry(); + msgs.Should().HaveCountGreaterThan(0); + } + + [Fact, Priority(4)] + public void GetReceivedMessageAsync_Test() + { + var msg = _connection.GetReceivedMessageAsync(1); + msg.Should().NotBeNull(); + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs new file mode 100644 index 0000000..630956a --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs @@ -0,0 +1,38 @@ +using System.Threading; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using MongoDB.Bson; +using MongoDB.Driver; +using Xunit; + +namespace DotNetCore.CAP.MongoDB.Test +{ + public class MongoDBStorageTest + { + private MongoClient _client; + + public MongoDBStorageTest() + { + _client = new MongoClient(ConnectionUtil.ConnectionString); + } + + [Fact] + public async void InitializeAsync_Test() + { + var options = new MongoDBOptions(); + var storage = new MongoDBStorage(new CapOptions(), options, _client, NullLogger.Instance); + await storage.InitializeAsync(default(CancellationToken)); + var names = _client.ListDatabaseNames()?.ToList(); + names.Should().Contain(options.Database); + + var collections = _client.GetDatabase(options.Database).ListCollectionNames()?.ToList(); + collections.Should().Contain(options.PublishedCollection); + collections.Should().Contain(options.ReceivedCollection); + collections.Should().Contain("Counter"); + + var collection = _client.GetDatabase(options.Database).GetCollection("Counter"); + collection.CountDocuments(new BsonDocument { { "_id", options.PublishedCollection } }).Should().Be(1); + collection.CountDocuments(new BsonDocument { { "_id", options.ReceivedCollection } }).Should().Be(1); + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs new file mode 100644 index 0000000..1a91bba --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs @@ -0,0 +1,82 @@ +using System; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Driver; +using Xunit; + +namespace DotNetCore.CAP.MongoDB.Test +{ + public class MongoDBTest + { + private MongoClient _client; + + public MongoDBTest() + { + _client = new MongoClient(ConnectionUtil.ConnectionString); + } + + [Fact] + public void MongoDB_Connection_Test() + { + var names = _client.ListDatabaseNames(); + names.ToList().Should().NotBeNullOrEmpty(); + } + + [Fact] + public void Transaction_Test() + { + var document = new BsonDocument + { + { "name", "MongoDB" }, + { "type", "Database" }, + { "count", 1 }, + { "info", new BsonDocument + { + { "x", 203 }, + { "y", 102 } + }} + }; + var db = _client.GetDatabase("test"); + var collection1 = db.GetCollection("test1"); + var collection2 = db.GetCollection("test2"); + using (var sesstion = _client.StartSession()) + { + sesstion.StartTransaction(); + collection1.InsertOne(document); + collection2.InsertOne(document); + sesstion.CommitTransaction(); + } + var filter = new BsonDocument("name", "MongoDB"); + collection1.CountDocuments(filter).Should().BeGreaterThan(0); + collection2.CountDocuments(filter).Should().BeGreaterThan(0); + } + + [Fact] + public void Transaction_Rollback_Test() + { + var document = new BsonDocument + { + {"name", "MongoDB"}, + {"date", DateTimeOffset.Now.ToString()} + }; + var db = _client.GetDatabase("test"); + + var collection = db.GetCollection("test3"); + var collection4 = db.GetCollection("test4"); + + using (var session = _client.StartSession()) + { + session.IsInTransaction.Should().BeFalse(); + session.StartTransaction(); + session.IsInTransaction.Should().BeTrue(); + collection.InsertOne(session, document); + collection4.InsertOne(session, new BsonDocument { { "name", "MongoDB" } }); + + session.AbortTransaction(); + } + var filter = new BsonDocument("name", "MongoDB"); + collection.CountDocuments(filter).Should().Be(0); + collection4.CountDocuments(filter).Should().Be(0); + } + } +} diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs new file mode 100644 index 0000000..49e221b --- /dev/null +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs @@ -0,0 +1,50 @@ +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP.Models; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Driver; +using Xunit; + +namespace DotNetCore.CAP.MongoDB.Test +{ + public class MongoDBUtilTest + { + private readonly MongoClient _client; + private readonly IMongoDatabase _database; + string _recieved = "ReceivedTest"; + + public MongoDBUtilTest() + { + _client = new MongoClient(ConnectionUtil.ConnectionString); + _database = _client.GetDatabase("CAP_Test"); + + //Initialize MongoDB + if (!_database.ListCollectionNames().ToList().Any(x => x == "Counter")) + { + var collection = _database.GetCollection("Counter"); + collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } }); + } + } + + [Fact] + public async void GetNextSequenceValueAsync_Test() + { + var id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _recieved); + id.Should().BeGreaterThan(0); + } + + [Fact] + public void GetNextSequenceValue_Concurrency_Test() + { + var dic = new ConcurrentDictionary(); + Parallel.For(0, 30, (x) => + { + var id = new MongoDBUtil().GetNextSequenceValue(_database, _recieved); + id.Should().BeGreaterThan(0); + dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same. + }); + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs index bc7ad66..e32ff4a 100644 --- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs +++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs @@ -15,13 +15,13 @@ namespace DotNetCore.CAP.Test public void CanCreateInstanceAndGetService() { var services = new ServiceCollection(); - + services.AddSingleton(); var builder = new CapBuilder(services); Assert.NotNull(builder); var count = builder.Services.Count; - Assert.Equal(1, count); + Assert.Equal(1, count); var provider = services.BuildServiceProvider(); var capPublisher = provider.GetService(); @@ -129,6 +129,11 @@ namespace DotNetCore.CAP.Test throw new NotImplementedException(); } + public void Publish(string name, T contentObj, object mongoSession, string callbackName = null) + { + throw new NotImplementedException(); + } + public Task PublishAsync(string topic, string content) { throw new NotImplementedException(); @@ -163,6 +168,16 @@ namespace DotNetCore.CAP.Test { throw new NotImplementedException(); } + + public void PublishWithMongo(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + throw new NotImplementedException(); + } + + public Task PublishWithMongoAsync(string name, T contentObj, object mongoSession = null, string callbackName = null) + { + throw new NotImplementedException(); + } } } } \ No newline at end of file