@@ -1,6 +1,7 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | // Copyright (c) .NET Core Community. All rights reserved. | ||||
// Licensed under the MIT License. See License.txt in the project root for license information. | // Licensed under the MIT License. See License.txt in the project root for license information. | ||||
using System; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP; | using DotNetCore.CAP; | ||||
@@ -18,7 +19,7 @@ namespace MongoDB.Driver | |||||
public CapMongoDbClientSessionHandle(ICapTransaction transaction) | public CapMongoDbClientSessionHandle(ICapTransaction transaction) | ||||
{ | { | ||||
_transaction = transaction; | _transaction = transaction; | ||||
_sessionHandle = (IClientSessionHandle) _transaction.DbTransaction; | |||||
_sessionHandle = (IClientSessionHandle)_transaction.DbTransaction; | |||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
@@ -76,5 +77,15 @@ namespace MongoDB.Driver | |||||
{ | { | ||||
return _sessionHandle.Fork(); | return _sessionHandle.Fork(); | ||||
} | } | ||||
public TResult WithTransaction<TResult>(Func<IClientSessionHandle, CancellationToken, TResult> callback, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default) | |||||
{ | |||||
return _sessionHandle.WithTransaction(callback, transactionOptions, cancellationToken); | |||||
} | |||||
public Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default) | |||||
{ | |||||
return _sessionHandle.WithTransactionAsync(callbackAsync, transactionOptions, cancellationToken); | |||||
} | |||||
} | } | ||||
} | } |
@@ -52,7 +52,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) | public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) | ||||
{ | { | ||||
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.PublishedCollection); | |||||
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection); | |||||
var updateDef = Builders<ReceivedMessage>.Update | var updateDef = Builders<ReceivedMessage>.Update | ||||
.Set(x => x.Retries, message.Retries) | .Set(x => x.Retries, message.Retries) | ||||
@@ -161,7 +161,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
{ | { | ||||
if (collection == _options.Value.PublishedCollection) | if (collection == _options.Value.PublishedCollection) | ||||
{ | { | ||||
Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, timeout); | |||||
//Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, timeout); | |||||
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection); | var publishedCollection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection); | ||||
var ret = await publishedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken); | var ret = await publishedCollection.DeleteManyAsync(x => x.ExpiresAt < timeout, cancellationToken); | ||||
@@ -63,12 +63,12 @@ namespace DotNetCore.CAP.MongoDB | |||||
var statistics = new StatisticsDto | var statistics = new StatisticsDto | ||||
{ | { | ||||
PublishedSucceeded = | PublishedSucceeded = | ||||
(int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), | |||||
(int)publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), | |||||
PublishedFailed = | PublishedFailed = | ||||
(int) publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)), | |||||
(int)publishedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)), | |||||
ReceivedSucceeded = | ReceivedSucceeded = | ||||
(int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), | |||||
ReceivedFailed = (int) receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)) | |||||
(int)receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Succeeded)), | |||||
ReceivedFailed = (int)receivedCollection.CountDocuments(x => x.StatusName == nameof(StatusName.Failed)) | |||||
}; | }; | ||||
return statistics; | return statistics; | ||||
} | } | ||||
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
var builder = Builders<MessageDto>.Filter; | var builder = Builders<MessageDto>.Filter; | ||||
var filter = builder.Empty; | var filter = builder.Empty; | ||||
if (!string.IsNullOrEmpty(queryDto.StatusName)) | if (!string.IsNullOrEmpty(queryDto.StatusName)) | ||||
filter &= builder.Eq(x => x.StatusName, queryDto.StatusName); | |||||
filter &= builder.Where(x => x.StatusName.ToLower() == queryDto.StatusName); | |||||
if (!string.IsNullOrEmpty(queryDto.Name)) filter &= builder.Eq(x => x.Name, queryDto.Name); | if (!string.IsNullOrEmpty(queryDto.Name)) filter &= builder.Eq(x => x.Name, queryDto.Name); | ||||
@@ -135,7 +135,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
private int GetNumberOfMessage(string collectionName, string statusName) | private int GetNumberOfMessage(string collectionName, string statusName) | ||||
{ | { | ||||
var collection = _database.GetCollection<BsonDocument>(collectionName); | var collection = _database.GetCollection<BsonDocument>(collectionName); | ||||
var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}}); | |||||
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } }); | |||||
return int.Parse(count.ToString()); | return int.Parse(count.ToString()); | ||||
} | } | ||||
@@ -194,7 +194,7 @@ namespace DotNetCore.CAP.MongoDB | |||||
} | } | ||||
}; | }; | ||||
var pipeline = new[] {match, groupby}; | |||||
var pipeline = new[] { match, groupby }; | |||||
var collection = _database.GetCollection<BsonDocument>(collectionName); | var collection = _database.GetCollection<BsonDocument>(collectionName); | ||||
var result = collection.Aggregate<BsonDocument>(pipeline).ToList(); | var result = collection.Aggregate<BsonDocument>(pipeline).ToList(); | ||||