Przeglądaj źródła

Refactor MongoDB for new transaction

master
Savorboard 6 lat temu
rodzic
commit
20717b5817
15 zmienionych plików z 98 dodań i 387 usunięć
  1. +16
    -42
      samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs
  2. +0
    -2
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
  3. +0
    -2
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs
  4. +18
    -170
      src/DotNetCore.CAP.MongoDB/CapPublisher.cs
  5. +54
    -0
      src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
  6. +0
    -0
      src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs
  7. +0
    -0
      src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs
  8. +2
    -15
      src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
  9. +5
    -9
      src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
  10. +0
    -66
      src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs
  11. +0
    -60
      src/DotNetCore.CAP.MongoDB/MongoTransaction.cs
  12. +2
    -0
      src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
  13. +1
    -0
      src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
  14. +0
    -0
      src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
  15. +0
    -21
      src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs

+ 16
- 42
samples/Sample.RabbitMQ.MongoDB/Controllers/ValuesController.cs Wyświetl plik

@@ -1,8 +1,5 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.MongoDB;
using Microsoft.AspNetCore.Mvc;
using MongoDB.Bson;
using MongoDB.Driver;
@@ -15,71 +12,48 @@ namespace Sample.RabbitMQ.MongoDB.Controllers
{
private readonly IMongoClient _client;
private readonly ICapPublisher _capPublisher;
private readonly IMongoTransaction _mongoTransaction;

public ValuesController(IMongoClient client, ICapPublisher capPublisher, IMongoTransaction mongoTransaction)
public ValuesController(IMongoClient client, ICapPublisher capPublisher)
{
_client = client;
_capPublisher = capPublisher;
_mongoTransaction = mongoTransaction;
}

[Route("~/publish")]
public async Task<IActionResult> PublishWithTrans()
public IActionResult PublishWithTrans()
{
using (var trans = await _mongoTransaction.BegeinAsync())
using (var session = _client.StartSession())
using (var trans = _capPublisher.CapTransaction.Begin(session))
{
var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(trans.GetSession(), new BsonDocument { { "hello", "world" } });
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });

await _capPublisher.PublishWithMongoAsync("sample.rabbitmq.mongodb", DateTime.Now, trans);
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now);

trans.Commit();
}
return Ok();
}

[Route("~/publish/not/autocommit")]
[Route("~/publish/autocommit")]
public IActionResult PublishNotAutoCommit()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
using (var session = _client.StartSession())
using (_capPublisher.CapTransaction.Begin(session,true))
{
var session = trans.GetSession();

var collection = _client.GetDatabase("TEST").GetCollection<BsonDocument>("test");
collection.InsertOne(session, new BsonDocument { { "Hello", "World" } });

_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
collection.InsertOne(session, new BsonDocument { { "hello", "world" } });

//Do something, and commit by yourself.
session.CommitTransaction();
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now);
}
return Ok();
}

[Route("~/publish/rollback")]
public IActionResult PublishRollback()
{
using (var trans = _mongoTransaction.Begein(autoCommit: false))
{
var session = trans.GetSession();
try
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now, trans);
//Do something, but
throw new Exception("Foo");
session.CommitTransaction();
}
catch (System.Exception ex)
{
session.AbortTransaction();
return StatusCode(500, ex.Message);
}
}
return Ok();
}
[Route("~/publish/without/trans")]
public IActionResult PublishWithoutTrans()
{
_capPublisher.PublishWithMongo("sample.rabbitmq.mongodb", DateTime.Now);
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now);
return Ok();
}



+ 0
- 2
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs Wyświetl plik

@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;

@@ -28,7 +27,6 @@ namespace DotNetCore.CAP.MongoDB
services.AddScoped<ICallbackPublisher, CapPublisher>();

services.AddTransient<ICollectProcessor, MongoDBCollectProcessor>();
services.AddTransient<IMongoTransaction, MongoTransaction>();

var options = new MongoDBOptions();
_configure?.Invoke(options);


+ 0
- 2
src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs Wyświetl plik

@@ -29,7 +29,5 @@ namespace DotNetCore.CAP.MongoDB
/// Default value: "published"
/// </summary>
public string PublishedCollection { get; set; } = "cap.published";

internal const string CounterCollection = "cap.counter";
}
}

+ 18
- 170
src/DotNetCore.CAP.MongoDB/CapPublisher.cs Wyświetl plik

@@ -2,13 +2,11 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
@@ -16,187 +14,37 @@ namespace DotNetCore.CAP.MongoDB
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;
private bool _usingTransaction = true;

public CapPublisher(
ILogger<CapPublisherBase> logger,
IDispatcher dispatcher,
IMongoClient client,
MongoDBOptions options,
IServiceProvider provider)
: base(logger, dispatcher)
public CapPublisher(IServiceProvider provider, MongoDBOptions options)
: base(provider)
{
_options = options;
_database = client.GetDatabase(_options.DatabaseName);
ServiceProvider = provider;
}

public async Task PublishCallbackAsync(CapPublishedMessage message)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection);
collection.InsertOne(message);
Enqueue(message);
await PublishAsyncInternal(message);
}

protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException("Not work for MongoDB");
}

protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
throw new NotImplementedException("Not work for MongoDB");
}

protected override void PrepareConnectionForEF()
{
throw new NotImplementedException("Not work for MongoDB");
}

public override void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
if (mongoTransaction == null)
{
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
}

PublishWithTransaction<T>(name, contentObj, mongoTransaction, callbackName);
}
var dbTrans = (IClientSessionHandle)transaction.DbTransaction;

public override async Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
if (mongoTransaction == null)
{
_usingTransaction = false;
mongoTransaction = new NullMongoTransaction();
}
var collection = dbTrans.Client
.GetDatabase(_options.DatabaseName)
.GetCollection<CapPublishedMessage>(_options.PublishedCollection);

await PublishWithTransactionAsync<T>(name, contentObj, mongoTransaction, callbackName);
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel);
}

private void PublishWithTransaction<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
protected override object GetDbTransaction()
{
var operationId = default(Guid);

var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

var session = transaction.GetSession();

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(session, message);

if (transaction.AutoCommit)
{
session.CommitTransaction();
}

if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id;
Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
throw;
}
}

private int Execute(IClientSessionHandle session, CapPublishedMessage message)
{
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session);

var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_usingTransaction)
{
collection.InsertOne(session, message);
}
else
{
collection.InsertOne(message);
}

return message.Id;
}


private async Task PublishWithTransactionAsync<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName)
{
var operationId = default(Guid);
var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

var session = transaction.GetSession();

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = await ExecuteAsync(session, message);

if (transaction.AutoCommit)
{
await session.CommitTransactionAsync();
}

if (!_usingTransaction || (transaction.AutoCommit && id > 0))
{
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);

message.Id = id;

Enqueue(message);
}
}
catch (Exception e)
{
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
}

private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message)
{
message.Id =
await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_usingTransaction)
{
await collection.InsertOneAsync(session, message);
}
else
{
await collection.InsertOneAsync(message);
}

return message.Id;
var client = ServiceProvider.GetRequiredService<IMongoClient>();
var session = client.StartSession(new ClientSessionOptions());
session.StartTransaction();
return session;
}
}
}

+ 54
- 0
src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs Wyświetl plik

@@ -0,0 +1,54 @@
using System.Data;
using System.Diagnostics;
using MongoDB.Driver;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MongoDBCapTransaction : CapTransactionBase
{
public MongoDBCapTransaction(IDispatcher dispatcher)
: base(dispatcher)
{
}

public override void Commit()
{
Debug.Assert(DbTransaction != null);

if (DbTransaction is IClientSessionHandle session)
{
session.CommitTransaction();
}

Flush();
}

public override void Rollback()
{
Debug.Assert(DbTransaction != null);

if (DbTransaction is IClientSessionHandle session)
{
session.AbortTransaction();
}
}

public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
}
}

public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IClientSessionHandle dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
}
}
}

src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs → src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs Wyświetl plik


src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs → src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs Wyświetl plik


+ 2
- 15
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs Wyświetl plik

@@ -6,7 +6,6 @@ using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
@@ -49,26 +48,14 @@ namespace DotNetCore.CAP.MongoDB
var database = _client.GetDatabase(_options.DatabaseName);
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();

if (!names.Any(n => n == _options.ReceivedCollection))
if (names.All(n => n != _options.ReceivedCollection))
{
await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
}

if (names.All(n => n != _options.PublishedCollection))
{
await database.CreateCollectionAsync(_options.PublishedCollection,
cancellationToken: cancellationToken);
}

if (names.All(n => n != MongoDBOptions.CounterCollection))
{
await database.CreateCollectionAsync(MongoDBOptions.CounterCollection, cancellationToken: cancellationToken);
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);
await collection.InsertManyAsync(new[]
{
new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument {{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
}, cancellationToken: cancellationToken);
await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken);
}

_logger.LogDebug("Ensuring all create database tables script are applied.");


+ 5
- 9
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs Wyświetl plik

@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.MongoDB
_database = _client.GetDatabase(_options.DatabaseName);
}

public bool ChangePublishedState(int messageId, string state)
public bool ChangePublishedState(long messageId, string state)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);

@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MongoDB
return result.ModifiedCount > 0;
}

public bool ChangeReceivedState(int messageId, string state)
public bool ChangeReceivedState(long messageId, string state)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

@@ -60,7 +60,7 @@ namespace DotNetCore.CAP.MongoDB
return new MongoDBStorageTransaction(_client, _options);
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.MongoDB
.ToListAsync();
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);
return await collection.Find(x => x.Id == id).FirstOrDefaultAsync();
@@ -95,7 +95,7 @@ namespace DotNetCore.CAP.MongoDB
.ToListAsync();
}

public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
@@ -104,11 +104,7 @@ namespace DotNetCore.CAP.MongoDB

var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection);

collection.InsertOne(message);

return message.Id;
}

public void Dispose()


+ 0
- 66
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs Wyświetl plik

@@ -1,66 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
internal class MongoDBUtil
{
private readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>
{
ReturnDocument = ReturnDocument.After
};

public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName,
IClientSessionHandle session = null)
{
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);

var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);
var filter = new BsonDocument { { "_id", collectionName } };

BsonDocument result;
if (session == null)
{
result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options);
}
else
{
result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options);
}

if (result.TryGetValue("sequence_value", out var value))
{
return value.ToInt32();
}

throw new Exception("Unable to get next sequence value.");
}

public int GetNextSequenceValue(IMongoDatabase database, string collectionName,
IClientSessionHandle session = null)
{
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection);

var filter = new BsonDocument { { "_id", collectionName } };
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);

var result = session == null
? collection.FindOneAndUpdate(filter, updateDef, _options)
: collection.FindOneAndUpdate(session, filter, updateDef, _options);

if (result.TryGetValue("sequence_value", out var value))
{
return value.ToInt32();
}

throw new Exception("Unable to get next sequence value.");
}
}
}

+ 0
- 60
src/DotNetCore.CAP.MongoDB/MongoTransaction.cs Wyświetl plik

@@ -1,60 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoTransaction : IMongoTransaction
{
private readonly IMongoClient _client;

public MongoTransaction(IMongoClient client)
{
_client = client;
}

public IClientSessionHandle Session { get; set; }
public bool AutoCommit { get; set; }

public async Task<IMongoTransaction> BegeinAsync(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = await _client.StartSessionAsync();
Session.StartTransaction();
return this;
}

public IMongoTransaction Begein(bool autoCommit = true)
{
AutoCommit = autoCommit;
Session = _client.StartSession();
Session.StartTransaction();
return this;
}

public void Dispose()
{
Session?.Dispose();
}
}

public class NullMongoTransaction : MongoTransaction
{
public NullMongoTransaction(IMongoClient client = null) : base(client)
{
AutoCommit = false;
}
}

public static class MongoTransactionExtensions
{
public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction)
{
var trans = mongoTransaction as MongoTransaction;
return trans?.Session;
}
}
}

+ 2
- 0
src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs Wyświetl plik

@@ -24,8 +24,10 @@ namespace DotNetCore.CAP
services.AddSingleton<CapDatabaseStorageMarkerService>();
services.AddSingleton<IStorage, MySqlStorage>();
services.AddSingleton<IStorageConnection, MySqlStorageConnection>();

services.AddScoped<ICapPublisher, CapPublisher>();
services.AddScoped<ICallbackPublisher, CapPublisher>();

services.AddTransient<ICollectProcessor, MySqlCollectProcessor>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();



+ 1
- 0
src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs Wyświetl plik

@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions


src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs → src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs Wyświetl plik


+ 0
- 21
src/DotNetCore.CAP/Abstractions/IMongoTransaction.cs Wyświetl plik

@@ -1,21 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Abstractions
{
public interface IMongoTransaction : IDisposable
{
/// <summary>
/// If set true, the session.CommitTransaction() will be called automatically.
/// </summary>
/// <value></value>
bool AutoCommit { get; set; }

Task<IMongoTransaction> BegeinAsync(bool autoCommit = true);

IMongoTransaction Begein(bool autoCommit = true);
}
}

Ładowanie…
Anuluj
Zapisz