@@ -5,16 +5,30 @@ namespace DotNetCore.CAP.MongoDB | |||||
{ | { | ||||
public class MongoDBOptions | public class MongoDBOptions | ||||
{ | { | ||||
public const string DefaultDatabase = "Cap"; | |||||
/// <summary> | |||||
/// Gets or sets the database name to use when creating database objects. | |||||
/// Default value: "cap" | |||||
/// </summary> | |||||
public string DatabaseName { get; set; } = "cap"; | |||||
/// <summary> | /// <summary> | ||||
/// Gets or sets the database to use when creating database objects. | |||||
/// Default is <see cref="DefaultDatabase" />. | |||||
/// MongoDB database connection string. | |||||
/// Default value: "mongodb://localhost:27017" | |||||
/// </summary> | /// </summary> | ||||
public string Database { get; set; } = DefaultDatabase; | |||||
public string DatabaseConnection { get; set; } = "mongodb://localhost:27017"; | |||||
public string ReceivedCollection { get; } = "Received"; | |||||
/// <summary> | |||||
/// MongoDB received message collection name. | |||||
/// Default value: "received" | |||||
/// </summary> | |||||
public string ReceivedCollection { get; set; } = "cap.received"; | |||||
/// <summary> | |||||
/// MongoDB published message collection name. | |||||
/// Default value: "published" | |||||
/// </summary> | |||||
public string PublishedCollection { get; set; } = "cap.published"; | |||||
public string PublishedCollection { get; } = "Published"; | |||||
internal const string CounterCollection = "cap.counter"; | |||||
} | } | ||||
} | } |
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
: base(logger, dispatcher) | : base(logger, dispatcher) | ||||
{ | { | ||||
_options = options; | _options = options; | ||||
_database = client.GetDatabase(_options.Database); | |||||
_database = client.GetDatabase(_options.DatabaseName); | |||||
ServiceProvider = provider; | ServiceProvider = provider; | ||||
} | } | ||||
@@ -23,13 +23,13 @@ namespace DotNetCore.CAP.MongoDB | |||||
{ | { | ||||
_options = options; | _options = options; | ||||
_logger = logger; | _logger = logger; | ||||
_database = client.GetDatabase(_options.Database); | |||||
_database = client.GetDatabase(_options.DatabaseName); | |||||
} | } | ||||
public async Task ProcessAsync(ProcessingContext context) | public async Task ProcessAsync(ProcessingContext context) | ||||
{ | { | ||||
_logger.LogDebug( | _logger.LogDebug( | ||||
$"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}]."); | |||||
$"Collecting expired data from collection [{_options.PublishedCollection}]."); | |||||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | ||||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | ||||
@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
var mongoClient = client ?? throw new ArgumentNullException(nameof(client)); | var mongoClient = client ?? throw new ArgumentNullException(nameof(client)); | ||||
_options = options ?? throw new ArgumentNullException(nameof(options)); | _options = options ?? throw new ArgumentNullException(nameof(options)); | ||||
_database = mongoClient.GetDatabase(_options.Database); | |||||
_database = mongoClient.GetDatabase(_options.DatabaseName); | |||||
} | } | ||||
public StatisticsDto GetStatistics() | public StatisticsDto GetStatistics() | ||||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
return; | return; | ||||
} | } | ||||
var database = _client.GetDatabase(_options.Database); | |||||
var database = _client.GetDatabase(_options.DatabaseName); | |||||
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList(); | var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList(); | ||||
if (!names.Any(n => n == _options.ReceivedCollection)) | if (!names.Any(n => n == _options.ReceivedCollection)) | ||||
@@ -60,10 +60,10 @@ namespace DotNetCore.CAP.MongoDB | |||||
cancellationToken: cancellationToken); | cancellationToken: cancellationToken); | ||||
} | } | ||||
if (names.All(n => n != "Counter")) | |||||
if (names.All(n => n != MongoDBOptions.CounterCollection)) | |||||
{ | { | ||||
await database.CreateCollectionAsync("Counter", cancellationToken: cancellationToken); | |||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
await database.CreateCollectionAsync(MongoDBOptions.CounterCollection, cancellationToken: cancellationToken); | |||||
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection); | |||||
await collection.InsertManyAsync(new[] | await collection.InsertManyAsync(new[] | ||||
{ | { | ||||
new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, | new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}}, | ||||
@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
_capOptions = capOptions; | _capOptions = capOptions; | ||||
_options = options; | _options = options; | ||||
_client = client; | _client = client; | ||||
_database = _client.GetDatabase(_options.Database); | |||||
_database = _client.GetDatabase(_options.DatabaseName); | |||||
} | } | ||||
public bool ChangePublishedState(int messageId, string state) | public bool ChangePublishedState(int messageId, string state) | ||||
@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) | public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) | ||||
{ | { | ||||
_options = options; | _options = options; | ||||
_database = client.GetDatabase(options.Database); | |||||
_database = client.GetDatabase(options.DatabaseName); | |||||
_session = client.StartSession(); | _session = client.StartSession(); | ||||
_session.StartTransaction(); | _session.StartTransaction(); | ||||
} | } | ||||
@@ -19,10 +19,10 @@ namespace DotNetCore.CAP.MongoDB | |||||
IClientSessionHandle session = null) | IClientSessionHandle session = null) | ||||
{ | { | ||||
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm | //https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm | ||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection); | |||||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | ||||
var filter = new BsonDocument {{"_id", collectionName}}; | |||||
var filter = new BsonDocument { { "_id", collectionName } }; | |||||
BsonDocument result; | BsonDocument result; | ||||
if (session == null) | if (session == null) | ||||
@@ -45,9 +45,9 @@ namespace DotNetCore.CAP.MongoDB | |||||
public int GetNextSequenceValue(IMongoDatabase database, string collectionName, | public int GetNextSequenceValue(IMongoDatabase database, string collectionName, | ||||
IClientSessionHandle session = null) | IClientSessionHandle session = null) | ||||
{ | { | ||||
var collection = database.GetCollection<BsonDocument>("Counter"); | |||||
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection); | |||||
var filter = new BsonDocument {{"_id", collectionName}}; | |||||
var filter = new BsonDocument { { "_id", collectionName } }; | |||||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | ||||
var result = session == null | var result = session == null | ||||