diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs index a3ae8bf..f9bd36d 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs @@ -2,7 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Persistence; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; @@ -23,12 +23,9 @@ namespace DotNetCore.CAP.MongoDB public void AddServices(IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(x => (MongoDBPublisher)x.GetService()); - services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddTransient(); diff --git a/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs index bba7b0b..7e70a06 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs @@ -22,10 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseMongoDB(this CapOptions options, Action configure) { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + if (configure == null) throw new ArgumentNullException(nameof(configure)); configure += x => x.Version = options.Version; diff --git a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs deleted file mode 100644 index 62e2a3e..0000000 --- a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBPublisher : CapPublisherBase, ICallbackPublisher - { - private readonly IMongoClient _client; - private readonly MongoDBOptions _options; - - public MongoDBPublisher(IServiceProvider provider) : base(provider) - { - _options = provider.GetService>().Value; - _client = ServiceProvider.GetRequiredService(); - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - await PublishAsyncInternal(message); - } - - protected override Task ExecuteAsync(CapPublishedMessage message, - ICapTransaction transaction = null, - CancellationToken cancel = default) - { - var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; - - var collection = _client - .GetDatabase(_options.DatabaseName) - .GetCollection(_options.PublishedCollection); - - var store = new PublishedMessage() - { - Id = message.Id, - Name = message.Name, - Content = message.Content, - Added = message.Added, - StatusName = message.StatusName, - ExpiresAt = message.ExpiresAt, - Retries = message.Retries, - Version = _options.Version, - }; - - if (transaction == null) - { - return collection.InsertOneAsync(store, insertOptions, cancel); - } - - var dbTrans = (IClientSessionHandle)transaction.DbTransaction; - return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs index 7a432c0..ce9aae3 100644 --- a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using MongoDB.Driver; @@ -19,10 +21,16 @@ namespace DotNetCore.CAP { Debug.Assert(DbTransaction != null); - if (DbTransaction is IClientSessionHandle session) - { - session.CommitTransaction(); - } + if (DbTransaction is IClientSessionHandle session) session.CommitTransaction(); + + Flush(); + } + + public override async Task CommitAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + if (DbTransaction is IClientSessionHandle session) await session.CommitTransactionAsync(cancellationToken); Flush(); } @@ -31,10 +39,14 @@ namespace DotNetCore.CAP { Debug.Assert(DbTransaction != null); - if (DbTransaction is IClientSessionHandle session) - { - session.AbortTransaction(); - } + if (DbTransaction is IClientSessionHandle session) session.AbortTransaction(); + } + + public override async Task RollbackAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + if (DbTransaction is IClientSessionHandle session) await session.AbortTransactionAsync(cancellationToken); } public override void Dispose() @@ -49,10 +61,7 @@ namespace DotNetCore.CAP public static ICapTransaction Begin(this ICapTransaction transaction, IClientSessionHandle dbTransaction, bool autoCommit = false) { - if (!dbTransaction.IsInTransaction) - { - dbTransaction.StartTransaction(); - } + if (!dbTransaction.IsInTransaction) dbTransaction.StartTransaction(); transaction.DbTransaction = dbTransaction; transaction.AutoCommit = autoCommit; diff --git a/src/DotNetCore.CAP.MongoDB/IClientSessionHandle.CAP.cs b/src/DotNetCore.CAP.MongoDB/IClientSessionHandle.CAP.cs index 7a34177..82e5dce 100644 --- a/src/DotNetCore.CAP.MongoDB/IClientSessionHandle.CAP.cs +++ b/src/DotNetCore.CAP.MongoDB/IClientSessionHandle.CAP.cs @@ -26,12 +26,12 @@ namespace MongoDB.Driver _transaction.Dispose(); } - public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken)) + public void AbortTransaction(CancellationToken cancellationToken = default) { _transaction.Rollback(); } - public Task AbortTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) + public Task AbortTransactionAsync(CancellationToken cancellationToken = default) { _transaction.Rollback(); return Task.CompletedTask; @@ -47,12 +47,12 @@ namespace MongoDB.Driver _sessionHandle.AdvanceOperationTime(newOperationTime); } - public void CommitTransaction(CancellationToken cancellationToken = default(CancellationToken)) + public void CommitTransaction(CancellationToken cancellationToken = default) { _transaction.Commit(); } - public Task CommitTransactionAsync(CancellationToken cancellationToken = default(CancellationToken)) + public Task CommitTransactionAsync(CancellationToken cancellationToken = default) { _transaction.Commit(); return Task.CompletedTask; diff --git a/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs deleted file mode 100644 index 5a9b200..0000000 --- a/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using DotNetCore.CAP.Processor; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBCollectProcessor : ICollectProcessor - { - private readonly IMongoDatabase _database; - private readonly ILogger _logger; - private readonly MongoDBOptions _options; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - - public MongoDBCollectProcessor( - ILogger logger, - IOptions options, - IMongoClient client) - { - _options = options.Value; - _logger = logger; - _database = client.GetDatabase(_options.DatabaseName); - } - - public async Task ProcessAsync(ProcessingContext context) - { - _logger.LogDebug($"Collecting expired data from collection [{_options.PublishedCollection}]."); - - var publishedCollection = _database.GetCollection(_options.PublishedCollection); - var receivedCollection = _database.GetCollection(_options.ReceivedCollection); - - await publishedCollection.BulkWriteAsync(new[] - { - new DeleteManyModel( - Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) - }); - - await receivedCollection.BulkWriteAsync(new[] - { - new DeleteManyModel( - Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) - }); - - await context.WaitAsync(_waitingInterval); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs new file mode 100644 index 0000000..dfa108e --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -0,0 +1,226 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Serialization; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBDataStorage : IDataStorage + { + private readonly IOptions _capOptions; + private readonly IMongoClient _client; + private readonly IMongoDatabase _database; + private readonly ILogger _logger; + private readonly IOptions _options; + + public MongoDBDataStorage( + IOptions capOptions, + IOptions options, + IMongoClient client, + ILogger logger) + { + _capOptions = capOptions; + _options = options; + _client = client; + _logger = logger; + _database = _client.GetDatabase(_options.Value.DatabaseName); + } + + public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) + { + var collection = _database.GetCollection(_options.Value.PublishedCollection); + + var updateDef = Builders.Update + .Set(x => x.Retries, message.Retries) + .Set(x => x.ExpiresAt, message.ExpiresAt) + .Set(x => x.StatusName, state.ToString("G")); + + await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef); + } + + public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) + { + var collection = _database.GetCollection(_options.Value.PublishedCollection); + + var updateDef = Builders.Update + .Set(x => x.Retries, message.Retries) + .Set(x => x.ExpiresAt, message.ExpiresAt) + .Set(x => x.StatusName, state.ToString("G")); + + await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef); + } + + public async Task StoreMessageAsync(string name, Message content, object dbTransaction = null, + CancellationToken cancellationToken = default) + { + var insertOptions = new InsertOneOptions {BypassDocumentValidation = false}; + + var message = new MediumMessage + { + DbId = content.GetId(), + Origin = content, + Content = StringSerializer.Serialize(content), + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + var collection = _database.GetCollection(_options.Value.PublishedCollection); + + var store = new PublishedMessage + { + Id = long.Parse(message.DbId), + Name = name, + Content = message.Content, + Added = message.Added, + StatusName = nameof(StatusName.Scheduled), + ExpiresAt = message.ExpiresAt, + Retries = message.Retries, + Version = _options.Value.Version + }; + + if (dbTransaction == null) + { + await collection.InsertOneAsync(store, insertOptions, cancellationToken); + } + else + { + var dbTrans = dbTransaction as IClientSessionHandle; + await collection.InsertOneAsync(dbTrans, store, insertOptions, cancellationToken); + } + + return message; + } + + public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) + { + var collection = _database.GetCollection(_options.Value.ReceivedCollection); + + var store = new ReceivedMessage + { + Id = SnowflakeId.Default().NextId(), + Group = group, + Name = name, + Content = content, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + Retries = _capOptions.Value.FailedRetryCount, + Version = _capOptions.Value.Version, + StatusName = nameof(StatusName.Failed) + }; + + await collection.InsertOneAsync(store); + } + + public async Task StoreReceivedMessageAsync(string name, string group, Message message) + { + var mdMessage = new MediumMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Origin = message, + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + var content = StringSerializer.Serialize(mdMessage.Origin); + + var collection = _database.GetCollection(_options.Value.ReceivedCollection); + + var store = new ReceivedMessage + { + Id = long.Parse(mdMessage.DbId), + Group = group, + Name = name, + Content = content, + Added = mdMessage.Added, + ExpiresAt = mdMessage.ExpiresAt, + Retries = mdMessage.Retries, + Version = _capOptions.Value.Version, + StatusName = nameof(StatusName.Scheduled) + }; + + await collection.InsertOneAsync(store); + + return mdMessage; + } + + public async Task DeleteExpiresAsync(string collection, DateTime timeout, int batchCount = 1000, + CancellationToken cancellationToken = default) + { + if (collection == _options.Value.PublishedCollection) + { + Builders.Filter.Lt(x => x.ExpiresAt, timeout); + + var publishedCollection = _database.GetCollection(_options.Value.PublishedCollection); + var ret = await publishedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken); + return (int) ret.DeletedCount; + } + else + { + var receivedCollection = _database.GetCollection(_options.Value.ReceivedCollection); + var ret = await receivedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken); + ; + return (int) ret.DeletedCount; + } + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4); + var collection = _database.GetCollection(_options.Value.PublishedCollection); + var queryResult = await collection + .Find(x => x.Retries < _capOptions.Value.FailedRetryCount + && x.Added < fourMinAgo + && x.Version == _capOptions.Value.Version + && (x.StatusName == nameof(StatusName.Failed) || + x.StatusName == nameof(StatusName.Scheduled))) + .Limit(200) + .ToListAsync(); + return queryResult.Select(x => new MediumMessage + { + DbId = x.Id.ToString(), + Origin = StringSerializer.DeSerialize(x.Content), + Retries = x.Retries, + Added = x.Added + }).ToList(); + } + + public async Task> GetReceivedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4); + var collection = _database.GetCollection(_options.Value.PublishedCollection); + var queryResult = await collection + .Find(x => x.Retries < _capOptions.Value.FailedRetryCount + && x.Added < fourMinAgo + && x.Version == _capOptions.Value.Version + && (x.StatusName == nameof(StatusName.Failed) || + x.StatusName == nameof(StatusName.Scheduled))) + .Limit(200) + .ToListAsync(); + return queryResult.Select(x => new MediumMessage + { + DbId = x.Id.ToString(), + Origin = StringSerializer.DeSerialize(x.Content), + Retries = x.Retries, + Added = x.Added + }).ToList(); + } + + public IMonitoringApi GetMonitoringApi() + { + return new MongoDBMonitoringApi(_client, _options); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs index 3b022f2..532c90e 100644 --- a/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs @@ -3,10 +3,11 @@ using System; using System.Collections.Generic; -using DotNetCore.CAP.Dashboard; -using DotNetCore.CAP.Dashboard.Monitoring; -using DotNetCore.CAP.Infrastructure; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; using Microsoft.Extensions.Options; using MongoDB.Bson; using MongoDB.Driver; @@ -26,61 +27,64 @@ namespace DotNetCore.CAP.MongoDB _database = mongoClient.GetDatabase(_options.DatabaseName); } - public StatisticsDto GetStatistics() + public async Task GetPublishedMessageAsync(long id) { - var publishedCollection = _database.GetCollection(_options.PublishedCollection); - var receivedCollection = _database.GetCollection(_options.ReceivedCollection); - - var statistics = new StatisticsDto(); - - { - if (int.TryParse( - publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), - out var count)) - { - statistics.PublishedSucceeded = count; - } - } - { - if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), - out var count)) - { - statistics.PublishedFailed = count; - } - } + var collection = _database.GetCollection(_options.PublishedCollection); + var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); + return new MediumMessage { - if (int.TryParse( - receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), - out var count)) - { - statistics.ReceivedSucceeded = count; - } - } + Added = message.Added, + Content = message.Content, + DbId = message.Id.ToString(), + ExpiresAt = message.ExpiresAt, + Retries = message.Retries + }; + } + + public async Task GetReceivedMessageAsync(long id) + { + var collection = _database.GetCollection(_options.ReceivedCollection); + var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); + return new MediumMessage { - if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), - out var count)) - { - statistics.ReceivedFailed = count; - } - } + Added = message.Added, + Content = message.Content, + DbId = message.Id.ToString(), + ExpiresAt = message.ExpiresAt, + Retries = message.Retries + }; + } + + public StatisticsDto GetStatistics() + { + var publishedCollection = _database.GetCollection(_options.PublishedCollection); + var receivedCollection = _database.GetCollection(_options.ReceivedCollection); + var statistics = new StatisticsDto + { + PublishedSucceeded = + (int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), + PublishedFailed = + (int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)), + ReceivedSucceeded = + (int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), + ReceivedFailed = (int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)) + }; return statistics; } public IDictionary HourlyFailedJobs(MessageType type) { - return GetHourlyTimelineStats(type, StatusName.Failed); + return GetHourlyTimelineStats(type, nameof(StatusName.Failed)); } public IDictionary HourlySucceededJobs(MessageType type) { - return GetHourlyTimelineStats(type, StatusName.Succeeded); + return GetHourlyTimelineStats(type, nameof(StatusName.Succeeded)); } public IList Messages(MessageQueryDto queryDto) { - queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); - var name = queryDto.MessageType == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; @@ -89,24 +93,14 @@ namespace DotNetCore.CAP.MongoDB var builder = Builders.Filter; var filter = builder.Empty; if (!string.IsNullOrEmpty(queryDto.StatusName)) - { - filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); - } + filter &= builder.Eq(x => x.StatusName, queryDto.StatusName); - if (!string.IsNullOrEmpty(queryDto.Name)) - { - filter = filter & builder.Eq(x => x.Name, queryDto.Name); - } + if (!string.IsNullOrEmpty(queryDto.Name)) filter &= builder.Eq(x => x.Name, queryDto.Name); - if (!string.IsNullOrEmpty(queryDto.Group)) - { - filter = filter & builder.Eq(x => x.Group, queryDto.Group); - } + if (!string.IsNullOrEmpty(queryDto.Group)) filter &= builder.Eq(x => x.Group, queryDto.Group); if (!string.IsNullOrEmpty(queryDto.Content)) - { - filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); - } + filter &= builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); var result = collection .Find(filter) @@ -120,28 +114,28 @@ namespace DotNetCore.CAP.MongoDB public int PublishedFailedCount() { - return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); + return GetNumberOfMessage(_options.PublishedCollection, nameof(StatusName.Failed)); } public int PublishedSucceededCount() { - return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); + return GetNumberOfMessage(_options.PublishedCollection, nameof(StatusName.Succeeded)); } public int ReceivedFailedCount() { - return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); + return GetNumberOfMessage(_options.ReceivedCollection, nameof(StatusName.Failed)); } public int ReceivedSucceededCount() { - return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); + return GetNumberOfMessage(_options.ReceivedCollection, nameof(StatusName.Succeeded)); } private int GetNumberOfMessage(string collectionName, string statusName) { var collection = _database.GetCollection(collectionName); - var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } }); + var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}}); return int.Parse(count.ToString()); } @@ -200,7 +194,7 @@ namespace DotNetCore.CAP.MongoDB } }; - var pipeline = new[] { match, groupby }; + var pipeline = new[] {match, groupby}; var collection = _database.GetCollection(collectionName); var result = collection.Aggregate(pipeline).ToList(); @@ -215,10 +209,7 @@ namespace DotNetCore.CAP.MongoDB result.ForEach(d => { var key = d["_id"].AsBsonDocument["Key"].AsString; - if (DateTime.TryParse(key, out var dateTime)) - { - dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; - } + if (DateTime.TryParse(key, out var dateTime)) dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; }); return dic; diff --git a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs deleted file mode 100644 index 772ecb1..0000000 --- a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.Options; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBStorageConnection : IStorageConnection - { - private readonly CapOptions _capOptions; - private readonly IMongoClient _client; - private readonly IMongoDatabase _database; - private readonly MongoDBOptions _options; - - public MongoDBStorageConnection( - IOptions capOptions, - IOptions options, - IMongoClient client) - { - _capOptions = capOptions.Value; - _options = options.Value; - _client = client; - _database = _client.GetDatabase(_options.DatabaseName); - } - - public bool ChangePublishedState(long messageId, string state) - { - var collection = _database.GetCollection(_options.PublishedCollection); - - var updateDef = Builders - .Update.Inc(x => x.Retries, 1) - .Set(x => x.ExpiresAt, null) - .Set(x => x.StatusName, state); - - var result = - collection.UpdateOne(x => x.Id == messageId, updateDef); - - return result.ModifiedCount > 0; - } - - public bool ChangeReceivedState(long messageId, string state) - { - var collection = _database.GetCollection(_options.ReceivedCollection); - - var updateDef = Builders - .Update.Inc(x => x.Retries, 1) - .Set(x => x.ExpiresAt, null) - .Set(x => x.StatusName, state); - - var result = - collection.UpdateOne(x => x.Id == messageId, updateDef); - - return result.ModifiedCount > 0; - } - - public IStorageTransaction CreateTransaction() - { - return new MongoDBStorageTransaction(_client, _options); - } - - public async Task GetPublishedMessageAsync(long id) - { - var collection = _database.GetCollection(_options.PublishedCollection); - return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); - } - - public async Task> GetPublishedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); - var collection = _database.GetCollection(_options.PublishedCollection); - return await collection - .Find(x => x.Retries < _capOptions.FailedRetryCount - && x.Added < fourMinsAgo - && x.Version == _capOptions.Version - && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) - .Limit(200) - .ToListAsync(); - } - - public async Task GetReceivedMessageAsync(long id) - { - var collection = _database.GetCollection(_options.ReceivedCollection); - return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); - } - - public async Task> GetReceivedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); - var collection = _database.GetCollection(_options.ReceivedCollection); - - return await collection - .Find(x => x.Retries < _capOptions.FailedRetryCount - && x.Added < fourMinsAgo - && x.Version == _capOptions.Version - && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) - .Limit(200) - .ToListAsync(); - } - - public void StoreReceivedMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - var collection = _database.GetCollection(_options.ReceivedCollection); - - var store = new ReceivedMessage() - { - Id = message.Id, - Group = message.Group, - Name = message.Name, - Content = message.Content, - Added = message.Added, - StatusName = message.StatusName, - ExpiresAt = message.ExpiresAt, - Retries = message.Retries, - Version = _capOptions.Version - }; - - collection.InsertOne(store); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs similarity index 60% rename from src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs rename to src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs index f5df9fe..4d88b78 100644 --- a/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs @@ -1,108 +1,90 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using DotNetCore.CAP.Dashboard; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBStorage : IStorage - { - private readonly IOptions _capOptions; - private readonly IMongoClient _client; - private readonly ILogger _logger; - private readonly IOptions _options; - - public MongoDBStorage( - IOptions capOptions, - IOptions options, - IMongoClient client, - ILogger logger) - { - _capOptions = capOptions; - _options = options; - _client = client; - _logger = logger; - } - - public IStorageConnection GetConnection() - { - return new MongoDBStorageConnection(_capOptions, _options, _client); - } - - public IMonitoringApi GetMonitoringApi() - { - return new MongoDBMonitoringApi(_client, _options); - } - - public async Task InitializeAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - var options = _options.Value; - var database = _client.GetDatabase(options.DatabaseName); - var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList(); - - if (names.All(n => n != options.ReceivedCollection)) - { - await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken); - } - - if (names.All(n => n != options.PublishedCollection)) - { - await database.CreateCollectionAsync(options.PublishedCollection, - cancellationToken: cancellationToken); - } - - var receivedMessageIndexNames = new[] { - nameof(ReceivedMessage.Name), nameof(ReceivedMessage.Added), nameof(ReceivedMessage.ExpiresAt), - nameof(ReceivedMessage.StatusName), nameof(ReceivedMessage.Retries), nameof(ReceivedMessage.Version) }; - - var publishedMessageIndexNames = new[] { - nameof(PublishedMessage.Name), nameof(PublishedMessage.Added), nameof(PublishedMessage.ExpiresAt), - nameof(PublishedMessage.StatusName), nameof(PublishedMessage.Retries), nameof(PublishedMessage.Version) }; - - await Task.WhenAll( - TryCreateIndexesAsync(options.ReceivedCollection, receivedMessageIndexNames), - TryCreateIndexesAsync(options.PublishedCollection, publishedMessageIndexNames) - ); - - _logger.LogDebug("Ensuring all create database tables script are applied."); - - async Task TryCreateIndexesAsync(string collectionName, string[] indexNames) - { - var col = database.GetCollection(collectionName); - using (var cursor = await col.Indexes.ListAsync(cancellationToken)) - { - var existingIndexes = await cursor.ToListAsync(cancellationToken); - var existingIndexNames = existingIndexes.Select(o => o["name"].AsString).ToArray(); - indexNames = indexNames.Except(existingIndexNames).ToArray(); - } - - if (indexNames.Any() == false) - return; - - var indexes = indexNames.Select(indexName => - { - var indexOptions = new CreateIndexOptions - { - Name = indexName, - Background = true, - }; - var indexBuilder = Builders.IndexKeys; - return new CreateIndexModel(indexBuilder.Descending(indexName), indexOptions); - }).ToArray(); - - await col.Indexes.CreateManyAsync(indexes, cancellationToken); - } - } - } +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Persistence; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; + +namespace DotNetCore.CAP.MongoDB +{ + public class MongoDBStorageInitializer : IStorageInitializer + { + private readonly IMongoClient _client; + private readonly ILogger _logger; + private readonly IOptions _options; + + public MongoDBStorageInitializer( + ILogger logger, + IMongoClient client, + IOptions options) + { + _options = options; + _logger = logger; + _client = client; + } + + public string GetPublishedTableName() + { + return _options.Value.PublishedCollection; + } + + public string GetReceivedTableName() + { + return _options.Value.ReceivedCollection; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var options = _options.Value; + var database = _client.GetDatabase(options.DatabaseName); + var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList(); + + if (names.All(n => n != options.ReceivedCollection)) + await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken); + + if (names.All(n => n != options.PublishedCollection)) + await database.CreateCollectionAsync(options.PublishedCollection, cancellationToken: cancellationToken); + + await Task.WhenAll( + TryCreateIndexesAsync(options.ReceivedCollection), + TryCreateIndexesAsync(options.PublishedCollection)); + + _logger.LogDebug("Ensuring all create database tables script are applied."); + + + async Task TryCreateIndexesAsync(string collectionName) + { + var indexNames = new[] {"Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version"}; + var col = database.GetCollection(collectionName); + using (var cursor = await col.Indexes.ListAsync(cancellationToken)) + { + var existingIndexes = await cursor.ToListAsync(cancellationToken); + var existingIndexNames = existingIndexes.Select(o => o["name"].AsString).ToArray(); + indexNames = indexNames.Except(existingIndexNames).ToArray(); + } + + if (indexNames.Any() == false) + return; + + var indexes = indexNames.Select(indexName => + { + var indexOptions = new CreateIndexOptions + { + Name = indexName, + Background = true + }; + var indexBuilder = Builders.IndexKeys; + return new CreateIndexModel(indexBuilder.Descending(indexName), indexOptions); + }).ToArray(); + + await col.Indexes.CreateManyAsync(indexes, cancellationToken); + } + } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs deleted file mode 100644 index 992aa8f..0000000 --- a/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using DotNetCore.CAP.Messages; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - internal class MongoDBStorageTransaction : IStorageTransaction - { - private readonly IMongoDatabase _database; - private readonly MongoDBOptions _options; - private readonly IClientSessionHandle _session; - - public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) - { - _options = options; - _database = client.GetDatabase(_options.DatabaseName); - _session = client.StartSession(); - _session.StartTransaction(); - } - - public async Task CommitAsync() - { - await _session.CommitTransactionAsync(); - } - - public void Dispose() - { - _session.Dispose(); - } - - public void UpdateMessage(CapPublishedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var collection = _database.GetCollection(_options.PublishedCollection); - - var updateDef = Builders.Update - .Set(x => x.Retries, message.Retries) - .Set(x => x.Content, message.Content) - .Set(x => x.ExpiresAt, message.ExpiresAt) - .Set(x => x.StatusName, message.StatusName); - - collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); - } - - public void UpdateMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var collection = _database.GetCollection(_options.ReceivedCollection); - - var updateDef = Builders.Update - .Set(x => x.Retries, message.Retries) - .Set(x => x.Content, message.Content) - .Set(x => x.ExpiresAt, message.ExpiresAt) - .Set(x => x.StatusName, message.StatusName); - - collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/StorageMessage.cs b/src/DotNetCore.CAP.MongoDB/StorageMessage.cs index ac253be..02cf6b8 100644 --- a/src/DotNetCore.CAP.MongoDB/StorageMessage.cs +++ b/src/DotNetCore.CAP.MongoDB/StorageMessage.cs @@ -1,14 +1,47 @@ -using DotNetCore.CAP.Messages; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; namespace DotNetCore.CAP.MongoDB { - internal class ReceivedMessage : CapReceivedMessage + internal class ReceivedMessage { + public long Id { get; set; } + public string Version { get; set; } + + public string Group { get; set; } + + public string Name { get; set; } + + public string Content { get; set; } + + public DateTime Added { get; set; } + + public DateTime? ExpiresAt { get; set; } + + public int Retries { get; set; } + + public string StatusName { get; set; } } - internal class PublishedMessage : CapPublishedMessage + internal class PublishedMessage { + public long Id { get; set; } + public string Version { get; set; } + + public string Name { get; set; } + + public string Content { get; set; } + + public DateTime Added { get; set; } + + public DateTime? ExpiresAt { get; set; } + + public int Retries { get; set; } + + public string StatusName { get; set; } } -} +} \ No newline at end of file