@@ -64,7 +64,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MongoDB", "s | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{CD276810-09A2-4105-8798-D65A8AA7C509}" | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{11563D1A-27CC-45CF-8C04-C16BCC21250A}" | |||
EndProject | |||
Global | |||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | |||
@@ -127,10 +127,10 @@ Global | |||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU | |||
EndGlobalSection | |||
GlobalSection(SolutionProperties) = preSolution | |||
HideSolutionNode = FALSE | |||
@@ -150,7 +150,7 @@ Global | |||
{C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} | |||
{77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | |||
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||
{CD276810-09A2-4105-8798-D65A8AA7C509} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||
{11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||
EndGlobalSection | |||
GlobalSection(ExtensibilityGlobals) = postSolution | |||
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} | |||
@@ -1,5 +1,7 @@ | |||
using System; | |||
using System.Data; | |||
using System.Threading.Tasks; | |||
using Dapper; | |||
using DotNetCore.CAP; | |||
using Microsoft.AspNetCore.Mvc; | |||
using MySql.Data.MySqlClient; | |||
@@ -16,24 +18,37 @@ namespace Sample.Kafka.MySql.Controllers | |||
_capBus = producer; | |||
} | |||
[Route("~/publish")] | |||
public async Task<IActionResult> PublishMessage() | |||
[Route("~/without/transaction")] | |||
public async Task<IActionResult> WithoutTransaction() | |||
{ | |||
using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;")) | |||
{ | |||
connection.Open(); | |||
var transaction = connection.BeginTransaction(); | |||
//your business code here | |||
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); | |||
await _capBus.PublishAsync("xxx.xxx.test2", 123456, transaction); | |||
return Ok(); | |||
} | |||
transaction.Commit(); | |||
[Route("~/adonet/transaction")] | |||
public IActionResult AdonetWithTransaction() | |||
{ | |||
using (var connection = new MySqlConnection("")) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); | |||
for (int i = 0; i < 5; i++) | |||
{ | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
} | |||
transaction.Commit(); | |||
} | |||
} | |||
return Ok("publish successful!"); | |||
return Ok(); | |||
} | |||
[CapSubscribe("#.test2")] | |||
public void Test2(int value) | |||
{ | |||
@@ -21,8 +21,6 @@ namespace Sample.Kafka.MySql | |||
public void Configure(IApplicationBuilder app) | |||
{ | |||
app.UseMvc(); | |||
app.UseCap(); | |||
} | |||
} | |||
} |
@@ -1,38 +0,0 @@ | |||
// <auto-generated /> | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||
using Microsoft.EntityFrameworkCore.Metadata; | |||
using Microsoft.EntityFrameworkCore.Migrations; | |||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion; | |||
using Sample.Kafka.SqlServer; | |||
namespace Sample.Kafka.SqlServer.Migrations | |||
{ | |||
[DbContext(typeof(AppDbContext))] | |||
[Migration("20180825123925_init")] | |||
partial class init | |||
{ | |||
protected override void BuildTargetModel(ModelBuilder modelBuilder) | |||
{ | |||
#pragma warning disable 612, 618 | |||
modelBuilder | |||
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846") | |||
.HasAnnotation("Relational:MaxIdentifierLength", 128) | |||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||
modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b => | |||
{ | |||
b.Property<int>("Id") | |||
.ValueGeneratedOnAdd() | |||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||
b.Property<string>("Name"); | |||
b.HasKey("Id"); | |||
b.ToTable("Persons"); | |||
}); | |||
#pragma warning restore 612, 618 | |||
} | |||
} | |||
} |
@@ -1,30 +0,0 @@ | |||
using Microsoft.EntityFrameworkCore.Metadata; | |||
using Microsoft.EntityFrameworkCore.Migrations; | |||
namespace Sample.Kafka.SqlServer.Migrations | |||
{ | |||
public partial class init : Migration | |||
{ | |||
protected override void Up(MigrationBuilder migrationBuilder) | |||
{ | |||
migrationBuilder.CreateTable( | |||
name: "Persons", | |||
columns: table => new | |||
{ | |||
Id = table.Column<int>(nullable: false) | |||
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn), | |||
Name = table.Column<string>(nullable: true) | |||
}, | |||
constraints: table => | |||
{ | |||
table.PrimaryKey("PK_Persons", x => x.Id); | |||
}); | |||
} | |||
protected override void Down(MigrationBuilder migrationBuilder) | |||
{ | |||
migrationBuilder.DropTable( | |||
name: "Persons"); | |||
} | |||
} | |||
} |
@@ -1,36 +0,0 @@ | |||
// <auto-generated /> | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||
using Microsoft.EntityFrameworkCore.Metadata; | |||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion; | |||
using Sample.Kafka.SqlServer; | |||
namespace Sample.Kafka.SqlServer.Migrations | |||
{ | |||
[DbContext(typeof(AppDbContext))] | |||
partial class AppDbContextModelSnapshot : ModelSnapshot | |||
{ | |||
protected override void BuildModel(ModelBuilder modelBuilder) | |||
{ | |||
#pragma warning disable 612, 618 | |||
modelBuilder | |||
.HasAnnotation("ProductVersion", "2.1.1-rtm-30846") | |||
.HasAnnotation("Relational:MaxIdentifierLength", 128) | |||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||
modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b => | |||
{ | |||
b.Property<int>("Id") | |||
.ValueGeneratedOnAdd() | |||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||
b.Property<string>("Name"); | |||
b.HasKey("Id"); | |||
b.ToTable("Persons"); | |||
}); | |||
#pragma warning restore 612, 618 | |||
} | |||
} | |||
} |
@@ -1,8 +0,0 @@ | |||
{ | |||
"Logging": { | |||
"IncludeScopes": false, | |||
"LogLevel": { | |||
"Default": "Debug" | |||
} | |||
} | |||
} |
@@ -1,202 +0,0 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Data; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Abstractions; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Models; | |||
using Microsoft.Extensions.Logging; | |||
using MongoDB.Driver; | |||
namespace DotNetCore.CAP.MongoDB | |||
{ | |||
public class CapPublisher : CapPublisherBase, ICallbackPublisher | |||
{ | |||
private readonly MongoDBOptions _options; | |||
private readonly IMongoDatabase _database; | |||
private bool _usingTransaction = true; | |||
public CapPublisher( | |||
ILogger<CapPublisherBase> logger, | |||
IDispatcher dispatcher, | |||
IMongoClient client, | |||
MongoDBOptions options, | |||
IServiceProvider provider) | |||
: base(logger, dispatcher) | |||
{ | |||
_options = options; | |||
_database = client.GetDatabase(_options.DatabaseName); | |||
ServiceProvider = provider; | |||
} | |||
public async Task PublishCallbackAsync(CapPublishedMessage message) | |||
{ | |||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection); | |||
collection.InsertOne(message); | |||
Enqueue(message); | |||
} | |||
protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, | |||
CapPublishedMessage message) | |||
{ | |||
throw new NotImplementedException("Not work for MongoDB"); | |||
} | |||
protected override Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, | |||
CapPublishedMessage message) | |||
{ | |||
throw new NotImplementedException("Not work for MongoDB"); | |||
} | |||
protected override void PrepareConnectionForEF() | |||
{ | |||
throw new NotImplementedException("Not work for MongoDB"); | |||
} | |||
public override void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) | |||
{ | |||
if (mongoTransaction == null) | |||
{ | |||
_usingTransaction = false; | |||
mongoTransaction = new NullMongoTransaction(); | |||
} | |||
PublishWithTransaction<T>(name, contentObj, mongoTransaction, callbackName); | |||
} | |||
public override async Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) | |||
{ | |||
if (mongoTransaction == null) | |||
{ | |||
_usingTransaction = false; | |||
mongoTransaction = new NullMongoTransaction(); | |||
} | |||
await PublishWithTransactionAsync<T>(name, contentObj, mongoTransaction, callbackName); | |||
} | |||
private void PublishWithTransaction<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName) | |||
{ | |||
var operationId = default(Guid); | |||
var content = Serialize(contentObj, callbackName); | |||
var message = new CapPublishedMessage | |||
{ | |||
Name = name, | |||
Content = content, | |||
StatusName = StatusName.Scheduled | |||
}; | |||
var session = transaction.GetSession(); | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||
var id = Execute(session, message); | |||
if (transaction.AutoCommit) | |||
{ | |||
session.CommitTransaction(); | |||
} | |||
if (!_usingTransaction || (transaction.AutoCommit && id > 0)) | |||
{ | |||
_logger.LogInformation($"message [{message}] has been persisted in the database."); | |||
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); | |||
message.Id = id; | |||
Enqueue(message); | |||
} | |||
} | |||
catch (Exception e) | |||
{ | |||
_logger.LogError(e, "An exception was occurred when publish message. message:" + name); | |||
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); | |||
throw; | |||
} | |||
} | |||
private int Execute(IClientSessionHandle session, CapPublishedMessage message) | |||
{ | |||
message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); | |||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||
if (_usingTransaction) | |||
{ | |||
collection.InsertOne(session, message); | |||
} | |||
else | |||
{ | |||
collection.InsertOne(message); | |||
} | |||
return message.Id; | |||
} | |||
private async Task PublishWithTransactionAsync<T>(string name, T contentObj, IMongoTransaction transaction, string callbackName) | |||
{ | |||
var operationId = default(Guid); | |||
var content = Serialize(contentObj, callbackName); | |||
var message = new CapPublishedMessage | |||
{ | |||
Name = name, | |||
Content = content, | |||
StatusName = StatusName.Scheduled | |||
}; | |||
var session = transaction.GetSession(); | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||
var id = await ExecuteAsync(session, message); | |||
if (transaction.AutoCommit) | |||
{ | |||
await session.CommitTransactionAsync(); | |||
} | |||
if (!_usingTransaction || (transaction.AutoCommit && id > 0)) | |||
{ | |||
_logger.LogInformation($"message [{message}] has been persisted in the database."); | |||
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); | |||
message.Id = id; | |||
Enqueue(message); | |||
} | |||
} | |||
catch (Exception e) | |||
{ | |||
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); | |||
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); | |||
Console.WriteLine(e); | |||
throw; | |||
} | |||
} | |||
private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) | |||
{ | |||
message.Id = | |||
await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); | |||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||
if (_usingTransaction) | |||
{ | |||
await collection.InsertOneAsync(session, message); | |||
} | |||
else | |||
{ | |||
await collection.InsertOneAsync(message); | |||
} | |||
return message.Id; | |||
} | |||
} | |||
} |
@@ -1,51 +0,0 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Models; | |||
using DotNetCore.CAP.Processor; | |||
using Microsoft.Extensions.Logging; | |||
using MongoDB.Driver; | |||
namespace DotNetCore.CAP.MongoDB | |||
{ | |||
public class MongoDBCollectProcessor : ICollectProcessor | |||
{ | |||
private readonly IMongoDatabase _database; | |||
private readonly ILogger _logger; | |||
private readonly MongoDBOptions _options; | |||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | |||
public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger, | |||
MongoDBOptions options, | |||
IMongoClient client) | |||
{ | |||
_options = options; | |||
_logger = logger; | |||
_database = client.GetDatabase(_options.DatabaseName); | |||
} | |||
public async Task ProcessAsync(ProcessingContext context) | |||
{ | |||
_logger.LogDebug( | |||
$"Collecting expired data from collection [{_options.PublishedCollection}]."); | |||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||
await publishedCollection.BulkWriteAsync(new[] | |||
{ | |||
new DeleteManyModel<CapPublishedMessage>( | |||
Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||
}); | |||
await receivedCollection.BulkWriteAsync(new[] | |||
{ | |||
new DeleteManyModel<CapReceivedMessage>( | |||
Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||
}); | |||
await context.WaitAsync(_waitingInterval); | |||
} | |||
} | |||
} |
@@ -1,226 +0,0 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Collections.Generic; | |||
using DotNetCore.CAP.Dashboard; | |||
using DotNetCore.CAP.Dashboard.Monitoring; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Models; | |||
using MongoDB.Bson; | |||
using MongoDB.Driver; | |||
namespace DotNetCore.CAP.MongoDB | |||
{ | |||
public class MongoDBMonitoringApi : IMonitoringApi | |||
{ | |||
private readonly IMongoDatabase _database; | |||
private readonly MongoDBOptions _options; | |||
public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) | |||
{ | |||
var mongoClient = client ?? throw new ArgumentNullException(nameof(client)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_database = mongoClient.GetDatabase(_options.DatabaseName); | |||
} | |||
public StatisticsDto GetStatistics() | |||
{ | |||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||
var statistics = new StatisticsDto(); | |||
{ | |||
if (int.TryParse( | |||
publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), | |||
out var count)) | |||
{ | |||
statistics.PublishedSucceeded = count; | |||
} | |||
} | |||
{ | |||
if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), | |||
out var count)) | |||
{ | |||
statistics.PublishedFailed = count; | |||
} | |||
} | |||
{ | |||
if (int.TryParse( | |||
receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), | |||
out var count)) | |||
{ | |||
statistics.ReceivedSucceeded = count; | |||
} | |||
} | |||
{ | |||
if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), | |||
out var count)) | |||
{ | |||
statistics.ReceivedFailed = count; | |||
} | |||
} | |||
return statistics; | |||
} | |||
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type) | |||
{ | |||
return GetHourlyTimelineStats(type, StatusName.Failed); | |||
} | |||
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type) | |||
{ | |||
return GetHourlyTimelineStats(type, StatusName.Succeeded); | |||
} | |||
public IList<MessageDto> Messages(MessageQueryDto queryDto) | |||
{ | |||
queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); | |||
var name = queryDto.MessageType == MessageType.Publish | |||
? _options.PublishedCollection | |||
: _options.ReceivedCollection; | |||
var collection = _database.GetCollection<MessageDto>(name); | |||
var builder = Builders<MessageDto>.Filter; | |||
var filter = builder.Empty; | |||
if (!string.IsNullOrEmpty(queryDto.StatusName)) | |||
{ | |||
filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); | |||
} | |||
if (!string.IsNullOrEmpty(queryDto.Name)) | |||
{ | |||
filter = filter & builder.Eq(x => x.Name, queryDto.Name); | |||
} | |||
if (!string.IsNullOrEmpty(queryDto.Group)) | |||
{ | |||
filter = filter & builder.Eq(x => x.Group, queryDto.Group); | |||
} | |||
if (!string.IsNullOrEmpty(queryDto.Content)) | |||
{ | |||
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); | |||
} | |||
var result = collection | |||
.Find(filter) | |||
.SortByDescending(x => x.Added) | |||
.Skip(queryDto.PageSize * queryDto.CurrentPage) | |||
.Limit(queryDto.PageSize) | |||
.ToList(); | |||
return result; | |||
} | |||
public int PublishedFailedCount() | |||
{ | |||
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); | |||
} | |||
public int PublishedSucceededCount() | |||
{ | |||
return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); | |||
} | |||
public int ReceivedFailedCount() | |||
{ | |||
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); | |||
} | |||
public int ReceivedSucceededCount() | |||
{ | |||
return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); | |||
} | |||
private int GetNumberOfMessage(string collectionName, string statusName) | |||
{ | |||
var collection = _database.GetCollection<BsonDocument>(collectionName); | |||
var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}}); | |||
return int.Parse(count.ToString()); | |||
} | |||
private IDictionary<DateTime, int> GetHourlyTimelineStats(MessageType type, string statusName) | |||
{ | |||
var collectionName = | |||
type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; | |||
var endDate = DateTime.UtcNow; | |||
var groupby = new BsonDocument | |||
{ | |||
{ | |||
"$group", new BsonDocument | |||
{ | |||
{ | |||
"_id", new BsonDocument | |||
{ | |||
{ | |||
"Key", new BsonDocument | |||
{ | |||
{ | |||
"$dateToString", new BsonDocument | |||
{ | |||
{"format", "%Y-%m-%d %H:00:00"}, | |||
{"date", "$Added"} | |||
} | |||
} | |||
} | |||
} | |||
} | |||
}, | |||
{"Count", new BsonDocument {{"$sum", 1}}} | |||
} | |||
} | |||
}; | |||
var match = new BsonDocument | |||
{ | |||
{ | |||
"$match", new BsonDocument | |||
{ | |||
{ | |||
"Added", new BsonDocument | |||
{ | |||
{"$gt", endDate.AddHours(-24)} | |||
} | |||
}, | |||
{ | |||
"StatusName", | |||
new BsonDocument | |||
{ | |||
{"$eq", statusName} | |||
} | |||
} | |||
} | |||
} | |||
}; | |||
var pipeline = new[] {match, groupby}; | |||
var collection = _database.GetCollection<BsonDocument>(collectionName); | |||
var result = collection.Aggregate<BsonDocument>(pipeline).ToList(); | |||
var dic = new Dictionary<DateTime, int>(); | |||
for (var i = 0; i < 24; i++) | |||
{ | |||
dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0); | |||
endDate = endDate.AddHours(-1); | |||
} | |||
result.ForEach(d => | |||
{ | |||
var key = d["_id"].AsBsonDocument["Key"].AsString; | |||
if (DateTime.TryParse(key, out var dateTime)) | |||
{ | |||
dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; | |||
} | |||
}); | |||
return dic; | |||
} | |||
} | |||
} |
@@ -1,65 +0,0 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Threading.Tasks; | |||
using MongoDB.Bson; | |||
using MongoDB.Driver; | |||
namespace DotNetCore.CAP.MongoDB | |||
{ | |||
internal class MongoDBUtil | |||
{ | |||
private readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument> | |||
{ | |||
ReturnDocument = ReturnDocument.After | |||
}; | |||
public async Task<int> GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, | |||
IClientSessionHandle session = null) | |||
{ | |||
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm | |||
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection); | |||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | |||
var filter = new BsonDocument { { "_id", collectionName } }; | |||
BsonDocument result; | |||
if (session == null) | |||
{ | |||
result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options); | |||
} | |||
else | |||
{ | |||
result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options); | |||
} | |||
if (result.TryGetValue("sequence_value", out var value)) | |||
{ | |||
return value.ToInt32(); | |||
} | |||
throw new Exception("Unable to get next sequence value."); | |||
} | |||
public int GetNextSequenceValue(IMongoDatabase database, string collectionName, | |||
IClientSessionHandle session = null) | |||
{ | |||
var collection = database.GetCollection<BsonDocument>(MongoDBOptions.CounterCollection); | |||
var filter = new BsonDocument { { "_id", collectionName } }; | |||
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1); | |||
var result = session == null | |||
? collection.FindOneAndUpdate(filter, updateDef, _options) | |||
: collection.FindOneAndUpdate(session, filter, updateDef, _options); | |||
if (result.TryGetValue("sequence_value", out var value)) | |||
{ | |||
return value.ToInt32(); | |||
} | |||
throw new Exception("Unable to get next sequence value."); | |||
} | |||
} | |||
} |
@@ -1,60 +0,0 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Abstractions; | |||
using MongoDB.Driver; | |||
namespace DotNetCore.CAP.MongoDB | |||
{ | |||
public class MongoTransaction : IMongoTransaction | |||
{ | |||
private readonly IMongoClient _client; | |||
public MongoTransaction(IMongoClient client) | |||
{ | |||
_client = client; | |||
} | |||
public IClientSessionHandle Session { get; set; } | |||
public bool AutoCommit { get; set; } | |||
public async Task<IMongoTransaction> BegeinAsync(bool autoCommit = true) | |||
{ | |||
AutoCommit = autoCommit; | |||
Session = await _client.StartSessionAsync(); | |||
Session.StartTransaction(); | |||
return this; | |||
} | |||
public IMongoTransaction Begein(bool autoCommit = true) | |||
{ | |||
AutoCommit = autoCommit; | |||
Session = _client.StartSession(); | |||
Session.StartTransaction(); | |||
return this; | |||
} | |||
public void Dispose() | |||
{ | |||
Session?.Dispose(); | |||
} | |||
} | |||
public class NullMongoTransaction : MongoTransaction | |||
{ | |||
public NullMongoTransaction(IMongoClient client = null) : base(client) | |||
{ | |||
AutoCommit = false; | |||
} | |||
} | |||
public static class MongoTransactionExtensions | |||
{ | |||
public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction) | |||
{ | |||
var trans = mongoTransaction as MongoTransaction; | |||
return trans?.Session; | |||
} | |||
} | |||
} |
@@ -1,30 +0,0 @@ | |||
using System.Collections.Concurrent; | |||
using System.Threading.Tasks; | |||
using FluentAssertions; | |||
using Xunit; | |||
namespace DotNetCore.CAP.MongoDB.Test | |||
{ | |||
[Collection("MongoDB")] | |||
public class MongoDBUtilTest : DatabaseTestHost | |||
{ | |||
[Fact] | |||
public async void GetNextSequenceValueAsync_Test() | |||
{ | |||
var id = await new MongoDBUtil().GetNextSequenceValueAsync(Database, MongoDBOptions.ReceivedCollection); | |||
id.Should().BeGreaterThan(0); | |||
} | |||
[Fact] | |||
public void GetNextSequenceValue_Concurrency_Test() | |||
{ | |||
var dic = new ConcurrentDictionary<int, int>(); | |||
Parallel.For(0, 30, (x) => | |||
{ | |||
var id = new MongoDBUtil().GetNextSequenceValue(Database, MongoDBOptions.ReceivedCollection); | |||
id.Should().BeGreaterThan(0); | |||
dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same. | |||
}); | |||
} | |||
} | |||
} |