Browse Source

Abstract MongoTransaction to make the usage more comfortable (#167)

* Abstract MongoTransaction to make the usage more comfortable

* Tweak the method name
master
keke 6 years ago
committed by Savorboard
parent
commit
885b37ba72
8 changed files with 154 additions and 41 deletions
  1. +33
    -12
      samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs
  2. +4
    -0
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
  3. +33
    -21
      src/DotNetCore.CAP.MongoDB/CapPublisher.cs
  4. +57
    -0
      src/DotNetCore.CAP.MongoDB/MongoTransaction.cs
  5. +2
    -2
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  6. +18
    -0
      src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs
  7. +5
    -4
      src/DotNetCore.CAP/ICapPublisher.cs
  8. +2
    -2
      test/DotNetCore.CAP.Test/CAP.BuilderTest.cs

+ 33
- 12
samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs View File

@@ -3,6 +3,8 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.MongoDB;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using MongoDB.Bson; using MongoDB.Bson;
using MongoDB.Driver; using MongoDB.Driver;
@@ -15,39 +17,58 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
{ {
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly ICapPublisher _capPublisher; private readonly ICapPublisher _capPublisher;
private readonly IMongoTransaction _mongoTransaction;


public ValuesController(IMongoClient client, ICapPublisher capPublisher)
public ValuesController(IMongoClient client, ICapPublisher capPublisher, IMongoTransaction mongoTransaction)
{ {
_client = client; _client = client;
_capPublisher = capPublisher; _capPublisher = capPublisher;
_mongoTransaction = mongoTransaction;
} }


[Route("~/publish")] [Route("~/publish")]
public IActionResult PublishWithSession()
public async Task<IActionResult> PublishWithTrans()
{ {
using (var session = _client.StartSession())
using (var trans = await _mongoTransaction.BegeinAsync())
{ {
session.StartTransaction();
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test"); var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
collection.InsertOne(trans.GetSession(), new BsonDocument { { "hello", "world" } });


_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);
await _capPublisher.PublishWithMongoAsync("sample.rabbitmq.mongodb", DateTime.Now, trans);
}
return Ok();
}

[Route("~/publish/not/autocommit")]
public IActionResult PublishNotAutoCommit()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{
var session = trans.GetSession();

var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "Hello", "World" } });

_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);


//Do something, and commit by yourself.
session.CommitTransaction(); session.CommitTransaction();
} }
return Ok(); return Ok();
} }


[Route("~/publish_rollback")]
[Route("~/publish/rollback")]
public IActionResult PublishRollback() public IActionResult PublishRollback()
{ {
using (var session = _client.StartSession())
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{ {
var session = trans.GetSession();
try try
{ {
session.StartTransaction();
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, session);
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
//Do something, but
throw new Exception("Foo"); throw new Exception("Foo");
session.CommitTransaction();
} }
catch (System.Exception ex) catch (System.Exception ex)
{ {
@@ -57,8 +78,8 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
} }
} }


[Route("~/publish_without_session")]
public IActionResult PublishWithoutSession()
[Route("~/publish/without/trans")]
public IActionResult PublishWithoutTrans()
{ {
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now); _capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now);
return Ok(); return Ok();


+ 4
- 0
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs View File

@@ -2,6 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;


@@ -25,6 +27,8 @@ namespace DotNetCore.CAP.MongoDB
services.AddScoped<ICallbackPublisher, CapPublisher>(); services.AddScoped<ICallbackPublisher, CapPublisher>();
services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>(); services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();


services.AddTransient<IMongoTransaction, MongoTransaction>();

var options = new MongoDBOptions(); var options = new MongoDBOptions();
_configure?.Invoke(options); _configure?.Invoke(options);
services.AddSingleton(options); services.AddSingleton(options);


+ 33
- 21
src/DotNetCore.CAP.MongoDB/CapPublisher.cs View File

@@ -15,9 +15,9 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class CapPublisher : CapPublisherBase, ICallbackPublisher public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options; private readonly MongoDBOptions _options;
private bool _isInTransaction = true;
private readonly IMongoDatabase _database;
private bool _usingTransaction = true;


public CapPublisher( public CapPublisher(
ILogger<CapPublisherBase> logger, ILogger<CapPublisherBase> logger,
@@ -57,31 +57,29 @@ namespace DotNetCore.CAP.MongoDB
throw new NotImplementedException("Not work for MongoDB"); throw new NotImplementedException("Not work for MongoDB");
} }


public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
public override void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
var session = mongoSession as IClientSessionHandle;
if (session == null)
if (mongoTransaction == null)
{ {
_isInTransaction = false;
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
} }


PublishWithSession(name, contentObj, session, callbackName);
PublishWithTransaction<T>(name, contentObj, mongoTransaction, callbackName);
} }


public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
var session = mongoSession as IClientSessionHandle;
if (session == null)
if (mongoTransaction == null)
{ {
_isInTransaction = false;
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
} }


await PublishWithSessionAsync(name, contentObj, session, callbackName);
await PublishWithTransactionAsync<T>(name, contentObj, mongoTransaction, callbackName);
} }


private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
private void PublishWithTransaction<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
{ {
var operationId = default(Guid); var operationId = default(Guid);


@@ -94,12 +92,19 @@ namespace DotNetCore.CAP.MongoDB
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };


var session = transaction.GetSession();

try try
{ {
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(session, message); var id = Execute(session, message);


if (!_isInTransaction && id > 0)
if (transaction.AutoCommit)
{
session.CommitTransaction();
}

if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database."); _logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
@@ -120,7 +125,7 @@ namespace DotNetCore.CAP.MongoDB
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session);


var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
if (_usingTransaction)
{ {
collection.InsertOne(session, message); collection.InsertOne(session, message);
} }
@@ -132,8 +137,8 @@ namespace DotNetCore.CAP.MongoDB
return message.Id; return message.Id;
} }


private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session,
string callbackName)
private async Task PublishWithTransactionAsync<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
{ {
var operationId = default(Guid); var operationId = default(Guid);
var content = Serialize(contentObj, callbackName); var content = Serialize(contentObj, callbackName);
@@ -145,13 +150,20 @@ namespace DotNetCore.CAP.MongoDB
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };


var session = transaction.GetSession();

try try
{ {
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);


var id = await ExecuteAsync(session, message); var id = await ExecuteAsync(session, message);


if (!_isInTransaction && id > 0)
if (transaction.AutoCommit)
{
await session.CommitTransactionAsync();
}

if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database."); _logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
@@ -175,7 +187,7 @@ namespace DotNetCore.CAP.MongoDB
message.Id = message.Id =
await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction)
if (_usingTransaction)
{ {
await collection.InsertOneAsync(session, message); await collection.InsertOneAsync(session, message);
} }


+ 57
- 0
src/DotNetCore.CAP.MongoDB/MongoTransaction.cs View File

@@ -0,0 +1,57 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoTransaction : IMongoTransaction
{
private readonly IMongoClient _client;
public MongoTransaction(IMongoClient client)
{
_client = client;
}

public IClientSessionHandle Session { get; set; }
public bool AutoCommit { get; set; }

public async Task<IMongoTransaction> BegeinAsync(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = await _client.StartSessionAsync();
Session.StartTransaction();
return this;
}

public IMongoTransaction Begein(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = _client.StartSession();
Session.StartTransaction();
return this;
}

public void Dispose()
{
Session?.Dispose();
}
}

public class NullMongoTransaction : MongoTransaction
{
public NullMongoTransaction(IMongoClient client = null) : base(client)
{
AutoCommit = false;
}
}

public static class MongoTransactionExtensions
{
public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction)
{
var trans = mongoTransaction as MongoTransaction;
return trans?.Session;
}
}
}

+ 2
- 2
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -67,12 +67,12 @@ namespace DotNetCore.CAP.Abstractions
return PublishWithTransAsync(name, contentObj, callbackName); return PublishWithTransAsync(name, contentObj, callbackName);
} }


public virtual void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public virtual void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
throw new NotImplementedException("Work for MongoDB only."); throw new NotImplementedException("Work for MongoDB only.");
} }


public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
throw new NotImplementedException("Work for MongoDB only."); throw new NotImplementedException("Work for MongoDB only.");
} }


+ 18
- 0
src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Abstractions
{
public interface IMongoTransaction : IDisposable
{
/// <summary>
/// If set true, the session.CommitTransaction() will be called automatically.
/// </summary>
/// <value></value>
bool AutoCommit { get; set; }

Task<IMongoTransaction> BegeinAsync(bool autoCommit = true);
IMongoTransaction Begein(bool autoCommit = true);
}
}

+ 5
- 4
src/DotNetCore.CAP/ICapPublisher.cs View File

@@ -3,6 +3,7 @@


using System.Data; using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;


namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
@@ -60,17 +61,17 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);
void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);


/// <summary> /// <summary>
/// Asynchronous publish an object message with mongo. /// Asynchronous publish an object message with mongo.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="mongoSession">if seesion was set null, the message will be published directly.</param>
/// <param name="mongoTransaction">if transaction was set null, the message will be published directly.</param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null);
Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null);
} }
} }

+ 2
- 2
test/DotNetCore.CAP.Test/CAP.BuilderTest.cs View File

@@ -169,12 +169,12 @@ namespace DotNetCore.CAP.Test
throw new NotImplementedException(); throw new NotImplementedException();
} }


public void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


Loading…
Cancel
Save