diff --git a/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs index 5e0a07d..bf202ad 100644 --- a/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs +++ b/samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs @@ -1,8 +1,5 @@ using System; -using System.Threading.Tasks; using DotNetCore.CAP; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.MongoDB; using Microsoft.AspNetCore.Mvc; using MongoDB.Bson; using MongoDB.Driver; @@ -15,71 +12,48 @@ namespace Sample.RabbitMQ.MongoDB.Controllers { private readonly IMongoClient _client; private readonly ICapPublisher _capPublisher; - private readonly IMongoTransaction _mongoTransaction; - public ValuesController(IMongoClient client, ICapPublisher capPublisher, IMongoTransaction mongoTransaction) + public ValuesController(IMongoClient client, ICapPublisher capPublisher) { _client = client; _capPublisher = capPublisher; - _mongoTransaction = mongoTransaction; } [Route("~/publish")] - public async Task PublishWithTrans() + public IActionResult PublishWithTrans() { - using (var trans = await _mongoTransaction.BegeinAsync()) + using (var session = _client.StartSession()) + using (var trans = _capPublisher.CapTransaction.Begin(session)) { var collection = _client.GetDatabase("TEST").GetCollection("test"); - collection.InsertOne(trans.GetSession(), new BsonDocument { { "hello", "world" } }); + collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); - await _capPublisher.PublishWithMongoAsync("sample.rabbitmq.mongodb", DateTime.Now, trans); + _capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); + + trans.Commit(); } return Ok(); } - [Route("~/publish/not/autocommit")] + [Route("~/publish/autocommit")] public IActionResult PublishNotAutoCommit() { - using (var trans = _mongoTransaction.Begein(autoCommit: false)) + using (var session = _client.StartSession()) + using (_capPublisher.CapTransaction.Begin(session,true)) { - var session = trans.GetSession(); - var collection = _client.GetDatabase("TEST").GetCollection("test"); - collection.InsertOne(session, new BsonDocument { { "Hello", "World" } }); - - _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans); + collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); - //Do something, and commit by yourself. - session.CommitTransaction(); + _capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); } - return Ok(); - } - [Route("~/publish/rollback")] - public IActionResult PublishRollback() - { - using (var trans = _mongoTransaction.Begein(autoCommit: false)) - { - var session = trans.GetSession(); - try - { - _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans); - //Do something, but - throw new Exception("Foo"); - session.CommitTransaction(); - } - catch (System.Exception ex) - { - session.AbortTransaction(); - return StatusCode(500, ex.Message); - } - } + return Ok(); } - + [Route("~/publish/without/trans")] public IActionResult PublishWithoutTrans() { - _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now); + _capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); return Ok(); } diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs index ab1158d..e53752d 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Processor; using Microsoft.Extensions.DependencyInjection; @@ -28,7 +27,6 @@ namespace DotNetCore.CAP.MongoDB services.AddScoped(); services.AddTransient(); - services.AddTransient(); var options = new MongoDBOptions(); _configure?.Invoke(options); diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs index fe00791..0cb350e 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs @@ -29,7 +29,5 @@ namespace DotNetCore.CAP.MongoDB /// Default value: "published" /// public string PublishedCollection { get; set; } = "cap.published"; - - internal const string CounterCollection = "cap.counter"; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CapPublisher.cs b/src/DotNetCore.CAP.MongoDB/CapPublisher.cs index df19c48..a02d3af 100644 --- a/src/DotNetCore.CAP.MongoDB/CapPublisher.cs +++ b/src/DotNetCore.CAP.MongoDB/CapPublisher.cs @@ -2,13 +2,11 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Data; +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Diagnostics; -using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; using MongoDB.Driver; namespace DotNetCore.CAP.MongoDB @@ -16,187 +14,37 @@ namespace DotNetCore.CAP.MongoDB public class CapPublisher : CapPublisherBase, ICallbackPublisher { private readonly MongoDBOptions _options; - private readonly IMongoDatabase _database; - private bool _usingTransaction = true; - public CapPublisher( - ILogger logger, - IDispatcher dispatcher, - IMongoClient client, - MongoDBOptions options, - IServiceProvider provider) - : base(logger, dispatcher) + public CapPublisher(IServiceProvider provider, MongoDBOptions options) + : base(provider) { _options = options; - _database = client.GetDatabase(_options.DatabaseName); - ServiceProvider = provider; } public async Task PublishCallbackAsync(CapPublishedMessage message) { - var collection = _database.GetCollection(_options.PublishedCollection); - message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection); - collection.InsertOne(message); - Enqueue(message); + await PublishAsyncInternal(message); } - protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) + protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, + CancellationToken cancel = default(CancellationToken)) { - throw new NotImplementedException("Not work for MongoDB"); - } - - protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) - { - throw new NotImplementedException("Not work for MongoDB"); - } - - protected override void PrepareConnectionForEF() - { - throw new NotImplementedException("Not work for MongoDB"); - } - - public override void PublishWithMongo(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - if (mongoTransaction == null) - { - _usingTransaction = false; - mongoTransaction = new NullMongoTransaction(); - } - - PublishWithTransaction(name, contentObj, mongoTransaction, callbackName); - } + var dbTrans = (IClientSessionHandle)transaction.DbTransaction; - public override async Task PublishWithMongoAsync(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - if (mongoTransaction == null) - { - _usingTransaction = false; - mongoTransaction = new NullMongoTransaction(); - } + var collection = dbTrans.Client + .GetDatabase(_options.DatabaseName) + .GetCollection(_options.PublishedCollection); - await PublishWithTransactionAsync(name, contentObj, mongoTransaction, callbackName); + var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; + return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel); } - private void PublishWithTransaction(string name, T contentObj, IMongoTransaction transaction, string callbackName) + protected override object GetDbTransaction() { - var operationId = default(Guid); - - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - - var session = transaction.GetSession(); - - try - { - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - var id = Execute(session, message); - - if (transaction.AutoCommit) - { - session.CommitTransaction(); - } - - if (!_usingTransaction || (transaction.AutoCommit && id > 0)) - { - _logger.LogInformation($"message [{message}] has been persisted in the database."); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - message.Id = id; - Enqueue(message); - } - } - catch (Exception e) - { - _logger.LogError(e, "An exception was occurred when publish message. message:" + name); - s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - throw; - } - } - - private int Execute(IClientSessionHandle session, CapPublishedMessage message) - { - message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); - - var collection = _database.GetCollection(_options.PublishedCollection); - if (_usingTransaction) - { - collection.InsertOne(session, message); - } - else - { - collection.InsertOne(message); - } - - return message.Id; - } - - - private async Task PublishWithTransactionAsync(string name, T contentObj, IMongoTransaction transaction, string callbackName) - { - var operationId = default(Guid); - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - - var session = transaction.GetSession(); - - try - { - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - - var id = await ExecuteAsync(session, message); - - if (transaction.AutoCommit) - { - await session.CommitTransactionAsync(); - } - - if (!_usingTransaction || (transaction.AutoCommit && id > 0)) - { - _logger.LogInformation($"message [{message}] has been persisted in the database."); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - - message.Id = id; - - Enqueue(message); - } - } - catch (Exception e) - { - _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); - s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - Console.WriteLine(e); - throw; - } - } - - private async Task ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) - { - message.Id = - await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); - var collection = _database.GetCollection(_options.PublishedCollection); - if (_usingTransaction) - { - await collection.InsertOneAsync(session, message); - } - else - { - await collection.InsertOneAsync(message); - } - - return message.Id; + var client = ServiceProvider.GetRequiredService(); + var session = client.StartSession(new ClientSessionOptions()); + session.StartTransaction(); + return session; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs new file mode 100644 index 0000000..e0a0054 --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs @@ -0,0 +1,54 @@ +using System.Data; +using System.Diagnostics; +using MongoDB.Driver; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class MongoDBCapTransaction : CapTransactionBase + { + public MongoDBCapTransaction(IDispatcher dispatcher) + : base(dispatcher) + { + } + + public override void Commit() + { + Debug.Assert(DbTransaction != null); + + if (DbTransaction is IClientSessionHandle session) + { + session.CommitTransaction(); + } + + Flush(); + } + + public override void Rollback() + { + Debug.Assert(DbTransaction != null); + + if (DbTransaction is IClientSessionHandle session) + { + session.AbortTransaction(); + } + } + + public override void Dispose() + { + (DbTransaction as IDbTransaction)?.Dispose(); + } + } + + public static class CapTransactionExtensions + { + public static ICapTransaction Begin(this ICapTransaction transaction, + IClientSessionHandle dbTransaction, bool autoCommit = false) + { + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + } +} diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs b/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs similarity index 100% rename from src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs rename to src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs b/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs similarity index 100% rename from src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs rename to src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs b/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs index 24fa082..805240c 100644 --- a/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs +++ b/src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs @@ -6,7 +6,6 @@ using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Dashboard; using Microsoft.Extensions.Logging; -using MongoDB.Bson; using MongoDB.Driver; namespace DotNetCore.CAP.MongoDB @@ -49,26 +48,14 @@ namespace DotNetCore.CAP.MongoDB var database = _client.GetDatabase(_options.DatabaseName); var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList(); - if (!names.Any(n => n == _options.ReceivedCollection)) + if (names.All(n => n != _options.ReceivedCollection)) { await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken); } if (names.All(n => n != _options.PublishedCollection)) { - await database.CreateCollectionAsync(_options.PublishedCollection, - cancellationToken: cancellationToken); - } - - if (names.All(n => n != MongoDBOptions.CounterCollection)) - { - await database.CreateCollectionAsync(MongoDBOptions.CounterCollection, cancellationToken: cancellationToken); - var collection = database.GetCollection(MongoDBOptions.CounterCollection); - await collection.InsertManyAsync(new[] - { - new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, - new BsonDocument {{"_id", _options.ReceivedCollection}, {"sequence_value", 0}} - }, cancellationToken: cancellationToken); + await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken); } _logger.LogDebug("Ensuring all create database tables script are applied."); diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs b/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs index 098ddf9..7b91026 100644 --- a/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs +++ b/src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs @@ -25,7 +25,7 @@ namespace DotNetCore.CAP.MongoDB _database = _client.GetDatabase(_options.DatabaseName); } - public bool ChangePublishedState(int messageId, string state) + public bool ChangePublishedState(long messageId, string state) { var collection = _database.GetCollection(_options.PublishedCollection); @@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MongoDB return result.ModifiedCount > 0; } - public bool ChangeReceivedState(int messageId, string state) + public bool ChangeReceivedState(long messageId, string state) { var collection = _database.GetCollection(_options.ReceivedCollection); @@ -60,7 +60,7 @@ namespace DotNetCore.CAP.MongoDB return new MongoDBStorageTransaction(_client, _options); } - public async Task GetPublishedMessageAsync(int id) + public async Task GetPublishedMessageAsync(long id) { var collection = _database.GetCollection(_options.PublishedCollection); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); @@ -77,7 +77,7 @@ namespace DotNetCore.CAP.MongoDB .ToListAsync(); } - public async Task GetReceivedMessageAsync(int id) + public async Task GetReceivedMessageAsync(long id) { var collection = _database.GetCollection(_options.ReceivedCollection); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); @@ -95,7 +95,7 @@ namespace DotNetCore.CAP.MongoDB .ToListAsync(); } - public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + public void StoreReceivedMessage(CapReceivedMessage message) { if (message == null) { @@ -104,11 +104,7 @@ namespace DotNetCore.CAP.MongoDB var collection = _database.GetCollection(_options.ReceivedCollection); - message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection); - collection.InsertOne(message); - - return message.Id; } public void Dispose() diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs b/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs deleted file mode 100644 index 4af9989..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using MongoDB.Bson; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - internal class MongoDBUtil - { - private readonly FindOneAndUpdateOptions _options = new FindOneAndUpdateOptions - { - ReturnDocument = ReturnDocument.After - }; - - public async Task GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, - IClientSessionHandle session = null) - { - //https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm - var collection = database.GetCollection(MongoDBOptions.CounterCollection); - - var updateDef = Builders.Update.Inc("sequence_value", 1); - var filter = new BsonDocument { { "_id", collectionName } }; - - BsonDocument result; - if (session == null) - { - result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options); - - } - else - { - result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options); - } - - if (result.TryGetValue("sequence_value", out var value)) - { - return value.ToInt32(); - } - - throw new Exception("Unable to get next sequence value."); - } - - public int GetNextSequenceValue(IMongoDatabase database, string collectionName, - IClientSessionHandle session = null) - { - var collection = database.GetCollection(MongoDBOptions.CounterCollection); - - var filter = new BsonDocument { { "_id", collectionName } }; - var updateDef = Builders.Update.Inc("sequence_value", 1); - - var result = session == null - ? collection.FindOneAndUpdate(filter, updateDef, _options) - : collection.FindOneAndUpdate(session, filter, updateDef, _options); - - if (result.TryGetValue("sequence_value", out var value)) - { - return value.ToInt32(); - } - - throw new Exception("Unable to get next sequence value."); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs b/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs deleted file mode 100644 index 96aed37..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoTransaction : IMongoTransaction - { - private readonly IMongoClient _client; - - public MongoTransaction(IMongoClient client) - { - _client = client; - } - - public IClientSessionHandle Session { get; set; } - public bool AutoCommit { get; set; } - - public async Task BegeinAsync(bool autoCommit = true) - { - AutoCommit = autoCommit; - Session = await _client.StartSessionAsync(); - Session.StartTransaction(); - return this; - } - - public IMongoTransaction Begein(bool autoCommit = true) - { - AutoCommit = autoCommit; - Session = _client.StartSession(); - Session.StartTransaction(); - return this; - } - - public void Dispose() - { - Session?.Dispose(); - } - } - - public class NullMongoTransaction : MongoTransaction - { - public NullMongoTransaction(IMongoClient client = null) : base(client) - { - AutoCommit = false; - } - } - - public static class MongoTransactionExtensions - { - public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction) - { - var trans = mongoTransaction as MongoTransaction; - return trans?.Session; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs index 8cae0be..7ba01c5 100644 --- a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs @@ -24,8 +24,10 @@ namespace DotNetCore.CAP services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddScoped(); services.AddScoped(); + services.AddTransient(); services.AddTransient(); diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs index bcac400..e27650d 100644 --- a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +// ReSharper disable once CheckNamespace namespace DotNetCore.CAP { public class MySqlOptions : EFOptions diff --git a/src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs similarity index 100% rename from src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs rename to src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs diff --git a/src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs b/src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs deleted file mode 100644 index b98caf8..0000000 --- a/src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; - -namespace DotNetCore.CAP.Abstractions -{ - public interface IMongoTransaction : IDisposable - { - /// - /// If set true, the session.CommitTransaction() will be called automatically. - /// - /// - bool AutoCommit { get; set; } - - Task BegeinAsync(bool autoCommit = true); - - IMongoTransaction Begein(bool autoCommit = true); - } -} \ No newline at end of file