Browse Source

add copyright license header to code file.

master
Savorboard 6 years ago
parent
commit
60c12d41a1
10 changed files with 152 additions and 72 deletions
  1. +3
    -1
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
  2. +2
    -1
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs
  3. +3
    -0
      src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs
  4. +28
    -16
      src/DotNetCore.CAP.MongoDB/CapPublisher.cs
  5. +11
    -5
      src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs
  6. +60
    -24
      src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs
  7. +9
    -5
      src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
  8. +18
    -13
      src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
  9. +5
    -2
      src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs
  10. +13
    -5
      src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs

+ 3
- 1
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs View File

@@ -1,5 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;




+ 2
- 1
src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs View File

@@ -1,4 +1,5 @@
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.


namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {


+ 3
- 0
src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs View File

@@ -1,3 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using DotNetCore.CAP; using DotNetCore.CAP;
using DotNetCore.CAP.MongoDB; using DotNetCore.CAP.MongoDB;


+ 28
- 16
src/DotNetCore.CAP.MongoDB/CapPublisher.cs View File

@@ -1,3 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using System.Data; using System.Data;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -12,8 +15,8 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class CapPublisher : CapPublisherBase, ICallbackPublisher public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
private bool _isInTransaction = true; private bool _isInTransaction = true;


public CapPublisher( public CapPublisher(
@@ -37,22 +40,25 @@ namespace DotNetCore.CAP.MongoDB
Enqueue(message); Enqueue(message);
} }


protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{ {
throw new System.NotImplementedException("Not work for MongoDB");
throw new NotImplementedException("Not work for MongoDB");
} }


protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{ {
throw new System.NotImplementedException("Not work for MongoDB");
throw new NotImplementedException("Not work for MongoDB");
} }


protected override void PrepareConnectionForEF() protected override void PrepareConnectionForEF()
{ {
throw new System.NotImplementedException("Not work for MongoDB");
throw new NotImplementedException("Not work for MongoDB");
} }


public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public override void PublishWithMongo<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
{ {
var session = mongoSession as IClientSessionHandle; var session = mongoSession as IClientSessionHandle;
if (session == null) if (session == null)
@@ -60,10 +66,11 @@ namespace DotNetCore.CAP.MongoDB
_isInTransaction = false; _isInTransaction = false;
} }


PublishWithSession<T>(name, contentObj, session, callbackName);
PublishWithSession(name, contentObj, session, callbackName);
} }


public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null, string callbackName = null)
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, object mongoSession = null,
string callbackName = null)
{ {
var session = mongoSession as IClientSessionHandle; var session = mongoSession as IClientSessionHandle;
if (session == null) if (session == null)
@@ -71,12 +78,12 @@ namespace DotNetCore.CAP.MongoDB
_isInTransaction = false; _isInTransaction = false;
} }


await PublishWithSessionAsync<T>(name, contentObj, session, callbackName);
await PublishWithSessionAsync(name, contentObj, session, callbackName);
} }


private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName) private void PublishWithSession<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
{ {
Guid operationId = default(Guid);
var operationId = default(Guid);


var content = Serialize(contentObj, callbackName); var content = Serialize(contentObj, callbackName);


@@ -100,7 +107,7 @@ namespace DotNetCore.CAP.MongoDB
Enqueue(message); Enqueue(message);
} }
} }
catch (System.Exception e)
catch (Exception e)
{ {
_logger.LogError(e, "An exception was occurred when publish message. message:" + name); _logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
@@ -121,12 +128,14 @@ namespace DotNetCore.CAP.MongoDB
{ {
collection.InsertOne(message); collection.InsertOne(message);
} }

return message.Id; return message.Id;
} }


private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session,
string callbackName)
{ {
Guid operationId = default(Guid);
var operationId = default(Guid);
var content = Serialize(contentObj, callbackName); var content = Serialize(contentObj, callbackName);


var message = new CapPublishedMessage var message = new CapPublishedMessage
@@ -152,7 +161,7 @@ namespace DotNetCore.CAP.MongoDB
Enqueue(message); Enqueue(message);
} }
} }
catch (System.Exception e)
catch (Exception e)
{ {
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
@@ -160,9 +169,11 @@ namespace DotNetCore.CAP.MongoDB
throw; throw;
} }
} }

private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message)
{ {
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
message.Id =
await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
if (_isInTransaction) if (_isInTransaction)
{ {
@@ -172,6 +183,7 @@ namespace DotNetCore.CAP.MongoDB
{ {
await collection.InsertOneAsync(message); await collection.InsertOneAsync(message);
} }

return message.Id; return message.Id;
} }
} }

+ 11
- 5
src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs View File

@@ -1,3 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
@@ -9,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBCollectProcessor : ICollectProcessor public class MongoDBCollectProcessor : ICollectProcessor
{ {
private readonly MongoDBOptions _options;
private readonly ILogger _logger;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly ILogger _logger;
private readonly MongoDBOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);


public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger, public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger,
@@ -25,18 +28,21 @@ namespace DotNetCore.CAP.MongoDB


public async Task ProcessAsync(ProcessingContext context) public async Task ProcessAsync(ProcessingContext context)
{ {
_logger.LogDebug($"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}].");
_logger.LogDebug(
$"Collecting expired data from collection [{_options.Database}].[{_options.PublishedCollection}].");


var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);


await publishedCollection.BulkWriteAsync(new[] await publishedCollection.BulkWriteAsync(new[]
{ {
new DeleteManyModel<CapPublishedMessage>(Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
new DeleteManyModel<CapPublishedMessage>(
Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
}); });
await receivedCollection.BulkWriteAsync(new[] await receivedCollection.BulkWriteAsync(new[]
{ {
new DeleteManyModel<CapReceivedMessage>(Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
new DeleteManyModel<CapReceivedMessage>(
Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
}); });


await context.WaitAsync(_waitingInterval); await context.WaitAsync(_waitingInterval);


+ 60
- 24
src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs View File

@@ -1,6 +1,8 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring; using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
@@ -12,8 +14,8 @@ namespace DotNetCore.CAP.MongoDB
{ {
public class MongoDBMonitoringApi : IMonitoringApi public class MongoDBMonitoringApi : IMonitoringApi
{ {
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;


public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
{ {
@@ -31,20 +33,34 @@ namespace DotNetCore.CAP.MongoDB
var statistics = new StatisticsDto(); var statistics = new StatisticsDto();


{ {
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count))
if (int.TryParse(
publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(),
out var count))
{
statistics.PublishedSucceeded = count; statistics.PublishedSucceeded = count;
}
} }
{ {
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count))
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(),
out var count))
{
statistics.PublishedFailed = count; statistics.PublishedFailed = count;
}
} }
{ {
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), out var count))
if (int.TryParse(
receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(),
out var count))
{
statistics.ReceivedSucceeded = count; statistics.ReceivedSucceeded = count;
}
} }
{ {
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), out var count))
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(),
out var count))
{
statistics.ReceivedFailed = count; statistics.ReceivedFailed = count;
}
} }


return statistics; return statistics;
@@ -64,7 +80,9 @@ namespace DotNetCore.CAP.MongoDB
{ {
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); queryDto.StatusName = StatusName.Standardized(queryDto.StatusName);


var name = queryDto.MessageType == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var name = queryDto.MessageType == MessageType.Publish
? _options.PublishedCollection
: _options.ReceivedCollection;
var collection = _database.GetCollection<MessageDto>(name); var collection = _database.GetCollection<MessageDto>(name);


var builder = Builders<MessageDto>.Filter; var builder = Builders<MessageDto>.Filter;
@@ -73,14 +91,17 @@ namespace DotNetCore.CAP.MongoDB
{ {
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName);
} }

if (!string.IsNullOrEmpty(queryDto.Name)) if (!string.IsNullOrEmpty(queryDto.Name))
{ {
filter = filter & builder.Eq(x => x.Name, queryDto.Name); filter = filter & builder.Eq(x => x.Name, queryDto.Name);
} }

if (!string.IsNullOrEmpty(queryDto.Group)) if (!string.IsNullOrEmpty(queryDto.Group))
{ {
filter = filter & builder.Eq(x => x.Group, queryDto.Group); filter = filter & builder.Eq(x => x.Group, queryDto.Group);
} }

if (!string.IsNullOrEmpty(queryDto.Content)) if (!string.IsNullOrEmpty(queryDto.Content))
{ {
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
@@ -119,52 +140,66 @@ namespace DotNetCore.CAP.MongoDB
private int GetNumberOfMessage(string collectionName, string statusName) private int GetNumberOfMessage(string collectionName, string statusName)
{ {
var collection = _database.GetCollection<BsonDocument>(collectionName); var collection = _database.GetCollection<BsonDocument>(collectionName);
var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } });
var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}});
return int.Parse(count.ToString()); return int.Parse(count.ToString());
} }


private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName) private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName)
{ {
var collectionName = type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var collectionName =
type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection;
var endDate = DateTime.UtcNow; var endDate = DateTime.UtcNow;


var groupby = new BsonDocument {
var groupby = new BsonDocument
{
{ {
"$group", new BsonDocument {
{ "_id", new BsonDocument {
{ "Key", new BsonDocument{
{ "$dateToString", new BsonDocument {
{ "format", "%Y-%m-%d %H:00:00"},
{ "date", "$Added"}
"$group", new BsonDocument
{
{
"_id", new BsonDocument
{
{
"Key", new BsonDocument
{
{
"$dateToString", new BsonDocument
{
{"format", "%Y-%m-%d %H:00:00"},
{"date", "$Added"}
} }
} }
} }
} }
} }
}, },
{ "Count", new BsonDocument{{ "$sum", 1}}}
{"Count", new BsonDocument {{"$sum", 1}}}
} }
} }
}; };


var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "Added", new BsonDocument
var match = new BsonDocument
{
{
"$match", new BsonDocument
{
{
"Added", new BsonDocument
{ {
{ "$gt", endDate.AddHours(-24) }
{"$gt", endDate.AddHours(-24)}
} }
}, },
{ "StatusName",
{
"StatusName",
new BsonDocument new BsonDocument
{ {
{ "$eq", statusName}
{"$eq", statusName}
} }
} }
} }
} }
}; };


var pipeline = new[] { match, groupby };
var pipeline = new[] {match, groupby};


var collection = _database.GetCollection<BsonDocument>(collectionName); var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline).ToList(); var result = collection.Aggregate<BsonDocument>(pipeline).ToList();
@@ -175,6 +210,7 @@ namespace DotNetCore.CAP.MongoDB
dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0); dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0);
endDate = endDate.AddHours(-1); endDate = endDate.AddHours(-1);
} }

result.ForEach(d => result.ForEach(d =>
{ {
var key = d["_id"].AsBsonDocument["Key"].AsString; var key = d["_id"].AsBsonDocument["Key"].AsString;


+ 9
- 5
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs View File

@@ -1,3 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.MongoDB
public class MongoDBStorage : IStorage public class MongoDBStorage : IStorage
{ {
private readonly CapOptions _capOptions; private readonly CapOptions _capOptions;
private readonly MongoDBOptions _options;
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly ILogger<MongoDBStorage> _logger; private readonly ILogger<MongoDBStorage> _logger;
private readonly MongoDBOptions _options;


public MongoDBStorage(CapOptions capOptions, public MongoDBStorage(CapOptions capOptions,
MongoDBOptions options, MongoDBOptions options,
@@ -53,17 +56,18 @@ namespace DotNetCore.CAP.MongoDB


if (names.All(n => n != _options.PublishedCollection)) if (names.All(n => n != _options.PublishedCollection))
{ {
await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken);
await database.CreateCollectionAsync(_options.PublishedCollection,
cancellationToken: cancellationToken);
} }


if (names.All(n => n != "Counter")) if (names.All(n => n != "Counter"))
{ {
await database.CreateCollectionAsync("Counter", cancellationToken: cancellationToken); await database.CreateCollectionAsync("Counter", cancellationToken: cancellationToken);
var collection = database.GetCollection<BsonDocument>("Counter"); var collection = database.GetCollection<BsonDocument>("Counter");
await collection.InsertManyAsync(new BsonDocument[]
await collection.InsertManyAsync(new[]
{ {
new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
new BsonDocument {{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument {{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
}, cancellationToken: cancellationToken); }, cancellationToken: cancellationToken);
} }




+ 18
- 13
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs View File

@@ -1,9 +1,11 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using MongoDB.Bson;
using MongoDB.Driver; using MongoDB.Driver;


namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
@@ -11,9 +13,9 @@ namespace DotNetCore.CAP.MongoDB
public class MongoDBStorageConnection : IStorageConnection public class MongoDBStorageConnection : IStorageConnection
{ {
private readonly CapOptions _capOptions; private readonly CapOptions _capOptions;
private readonly MongoDBOptions _options;
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;


public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client) public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client)
{ {
@@ -28,12 +30,12 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);


var updateDef = Builders<CapPublishedMessage> var updateDef = Builders<CapPublishedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);


var result = var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);
collection.UpdateOne(x => x.Id == messageId, updateDef);


return result.ModifiedCount > 0; return result.ModifiedCount > 0;
} }
@@ -43,12 +45,12 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);


var updateDef = Builders<CapReceivedMessage> var updateDef = Builders<CapReceivedMessage>
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);
.Update.Inc(x => x.Retries, 1)
.Set(x => x.ExpiresAt, null)
.Set(x => x.StatusName, state);


var result = var result =
collection.UpdateOne(x => x.Id == messageId, updateDef);
collection.UpdateOne(x => x.Id == messageId, updateDef);


return result.ModifiedCount > 0; return result.ModifiedCount > 0;
} }
@@ -69,7 +71,8 @@ namespace DotNetCore.CAP.MongoDB
var fourMinsAgo = DateTime.Now.AddMinutes(-4); var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await collection return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo &&
(x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200) .Limit(200)
.ToListAsync(); .ToListAsync();
} }
@@ -86,7 +89,8 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);


return await collection return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo &&
(x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200) .Limit(200)
.ToListAsync(); .ToListAsync();
} }
@@ -97,6 +101,7 @@ namespace DotNetCore.CAP.MongoDB
{ {
throw new ArgumentNullException(nameof(message)); throw new ArgumentNullException(nameof(message));
} }

var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);


message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection); message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.ReceivedCollection);
@@ -105,9 +110,9 @@ namespace DotNetCore.CAP.MongoDB


return message.Id; return message.Id;
} }

public void Dispose() public void Dispose()
{ {

} }
} }
} }

+ 5
- 2
src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs View File

@@ -1,14 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using MongoDB.Driver; using MongoDB.Driver;
using System;


namespace DotNetCore.CAP.MongoDB namespace DotNetCore.CAP.MongoDB
{ {
internal class MongoDBStorageTransaction : IStorageTransaction internal class MongoDBStorageTransaction : IStorageTransaction
{ {
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
private readonly IClientSessionHandle _session; private readonly IClientSessionHandle _session;


public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options) public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)


+ 13
- 5
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs View File

@@ -1,3 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using MongoDB.Bson; using MongoDB.Bson;
@@ -7,17 +10,19 @@ namespace DotNetCore.CAP.MongoDB
{ {
internal class MongoDBUtil internal class MongoDBUtil
{ {
readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>()
private readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>
{ {
ReturnDocument = ReturnDocument.After ReturnDocument = ReturnDocument.After
}; };
public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, IClientSessionHandle session = null)

public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName,
IClientSessionHandle session = null)
{ {
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm //https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm
var collection = database.GetCollection<BsonDocument>("Counter"); var collection = database.GetCollection<BsonDocument>("Counter");


var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);
var filter = new BsonDocument { { "_id", collectionName } };
var filter = new BsonDocument {{"_id", collectionName}};


BsonDocument result; BsonDocument result;
if (session == null) if (session == null)
@@ -33,14 +38,16 @@ namespace DotNetCore.CAP.MongoDB
{ {
return value.ToInt32(); return value.ToInt32();
} }

throw new Exception("Unable to get next sequence value."); throw new Exception("Unable to get next sequence value.");
} }


public int GetNextSequenceValue(IMongoDatabase database, string collectionName, IClientSessionHandle session = null)
public int GetNextSequenceValue(IMongoDatabase database, string collectionName,
IClientSessionHandle session = null)
{ {
var collection = database.GetCollection<BsonDocument>("Counter"); var collection = database.GetCollection<BsonDocument>("Counter");


var filter = new BsonDocument { { "_id", collectionName } };
var filter = new BsonDocument {{"_id", collectionName}};
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);


var result = session == null var result = session == null
@@ -51,6 +58,7 @@ namespace DotNetCore.CAP.MongoDB
{ {
return value.ToInt32(); return value.ToInt32();
} }

throw new Exception("Unable to get next sequence value."); throw new Exception("Unable to get next sequence value.");
} }
} }

Loading…
Cancel
Save