diff --git a/CAP.sln b/CAP.sln index 3455243..8fc6b2f 100644 --- a/CAP.sln +++ b/CAP.sln @@ -64,7 +64,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MongoDB", "s EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", "samples\Sample.RabbitMQ.MongoDB\Sample.RabbitMQ.MongoDB.csproj", "{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{CD276810-09A2-4105-8798-D65A8AA7C509}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{11563D1A-27CC-45CF-8C04-C16BCC21250A}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -127,10 +127,10 @@ Global {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU - {CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CD276810-09A2-4105-8798-D65A8AA7C509}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CD276810-09A2-4105-8798-D65A8AA7C509}.Release|Any CPU.Build.0 = Release|Any CPU + {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -150,7 +150,7 @@ Global {C143FCDF-E7F3-46F8-987E-A1BA38C1639D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} - {CD276810-09A2-4105-8798-D65A8AA7C509} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs index 21510bb..1b935bd 100644 --- a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs @@ -1,5 +1,7 @@ using System; +using System.Data; using System.Threading.Tasks; +using Dapper; using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; using MySql.Data.MySqlClient; @@ -16,24 +18,37 @@ namespace Sample.Kafka.MySql.Controllers _capBus = producer; } - [Route("~/publish")] - public async Task PublishMessage() + [Route("~/without/transaction")] + public async Task WithoutTransaction() { - using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;")) - { - connection.Open(); - var transaction = connection.BeginTransaction(); - - //your business code here + await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); - await _capBus.PublishAsync("xxx.xxx.test2", 123456, transaction); + return Ok(); + } - transaction.Commit(); + [Route("~/adonet/transaction")] + public IActionResult AdonetWithTransaction() + { + using (var connection = new MySqlConnection("")) + { + using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) + { + //your business code + connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); + + for (int i = 0; i < 5; i++) + { + _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); + } + + transaction.Commit(); + } } - return Ok("publish successful!"); + return Ok(); } + [CapSubscribe("#.test2")] public void Test2(int value) { diff --git a/samples/Sample.Kafka.MySql/Startup.cs b/samples/Sample.Kafka.MySql/Startup.cs index 59c4636..3a6a8f7 100644 --- a/samples/Sample.Kafka.MySql/Startup.cs +++ b/samples/Sample.Kafka.MySql/Startup.cs @@ -21,8 +21,6 @@ namespace Sample.Kafka.MySql public void Configure(IApplicationBuilder app) { app.UseMvc(); - - app.UseCap(); } } } \ No newline at end of file diff --git a/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.Designer.cs b/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.Designer.cs deleted file mode 100644 index 57df6d5..0000000 --- a/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.Designer.cs +++ /dev/null @@ -1,38 +0,0 @@ -// -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Migrations; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using Sample.Kafka.SqlServer; - -namespace Sample.Kafka.SqlServer.Migrations -{ - [DbContext(typeof(AppDbContext))] - [Migration("20180825123925_init")] - partial class init - { - protected override void BuildTargetModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("ProductVersion", "2.1.1-rtm-30846") - .HasAnnotation("Relational:MaxIdentifierLength", 128) - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - b.Property("Name"); - - b.HasKey("Id"); - - b.ToTable("Persons"); - }); -#pragma warning restore 612, 618 - } - } -} diff --git a/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.cs b/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.cs deleted file mode 100644 index 453e685..0000000 --- a/samples/Sample.Kafka.SqlServer/Migrations/20180825123925_init.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Migrations; - -namespace Sample.Kafka.SqlServer.Migrations -{ - public partial class init : Migration - { - protected override void Up(MigrationBuilder migrationBuilder) - { - migrationBuilder.CreateTable( - name: "Persons", - columns: table => new - { - Id = table.Column(nullable: false) - .Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn), - Name = table.Column(nullable: true) - }, - constraints: table => - { - table.PrimaryKey("PK_Persons", x => x.Id); - }); - } - - protected override void Down(MigrationBuilder migrationBuilder) - { - migrationBuilder.DropTable( - name: "Persons"); - } - } -} diff --git a/samples/Sample.Kafka.SqlServer/Migrations/AppDbContextModelSnapshot.cs b/samples/Sample.Kafka.SqlServer/Migrations/AppDbContextModelSnapshot.cs deleted file mode 100644 index ca5ca05..0000000 --- a/samples/Sample.Kafka.SqlServer/Migrations/AppDbContextModelSnapshot.cs +++ /dev/null @@ -1,36 +0,0 @@ -// -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using Sample.Kafka.SqlServer; - -namespace Sample.Kafka.SqlServer.Migrations -{ - [DbContext(typeof(AppDbContext))] - partial class AppDbContextModelSnapshot : ModelSnapshot - { - protected override void BuildModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("ProductVersion", "2.1.1-rtm-30846") - .HasAnnotation("Relational:MaxIdentifierLength", 128) - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - modelBuilder.Entity("Sample.Kafka.SqlServer.Person", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - b.Property("Name"); - - b.HasKey("Id"); - - b.ToTable("Persons"); - }); -#pragma warning restore 612, 618 - } - } -} diff --git a/samples/Sample.Kafka.SqlServer/appsettings.json b/samples/Sample.Kafka.SqlServer/appsettings.json deleted file mode 100644 index 20aa907..0000000 --- a/samples/Sample.Kafka.SqlServer/appsettings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "Logging": { - "IncludeScopes": false, - "LogLevel": { - "Default": "Debug" - } - } -} diff --git a/src/DotNetCore.CAP.MongoDB/CapPublisher.cs b/src/DotNetCore.CAP.MongoDB/CapPublisher.cs deleted file mode 100644 index df19c48..0000000 --- a/src/DotNetCore.CAP.MongoDB/CapPublisher.cs +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Diagnostics; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Models; -using Microsoft.Extensions.Logging; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class CapPublisher : CapPublisherBase, ICallbackPublisher - { - private readonly MongoDBOptions _options; - private readonly IMongoDatabase _database; - private bool _usingTransaction = true; - - public CapPublisher( - ILogger logger, - IDispatcher dispatcher, - IMongoClient client, - MongoDBOptions options, - IServiceProvider provider) - : base(logger, dispatcher) - { - _options = options; - _database = client.GetDatabase(_options.DatabaseName); - ServiceProvider = provider; - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - var collection = _database.GetCollection(_options.PublishedCollection); - message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection); - collection.InsertOne(message); - Enqueue(message); - } - - protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) - { - throw new NotImplementedException("Not work for MongoDB"); - } - - protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) - { - throw new NotImplementedException("Not work for MongoDB"); - } - - protected override void PrepareConnectionForEF() - { - throw new NotImplementedException("Not work for MongoDB"); - } - - public override void PublishWithMongo(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - if (mongoTransaction == null) - { - _usingTransaction = false; - mongoTransaction = new NullMongoTransaction(); - } - - PublishWithTransaction(name, contentObj, mongoTransaction, callbackName); - } - - public override async Task PublishWithMongoAsync(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - if (mongoTransaction == null) - { - _usingTransaction = false; - mongoTransaction = new NullMongoTransaction(); - } - - await PublishWithTransactionAsync(name, contentObj, mongoTransaction, callbackName); - } - - private void PublishWithTransaction(string name, T contentObj, IMongoTransaction transaction, string callbackName) - { - var operationId = default(Guid); - - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - - var session = transaction.GetSession(); - - try - { - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - var id = Execute(session, message); - - if (transaction.AutoCommit) - { - session.CommitTransaction(); - } - - if (!_usingTransaction || (transaction.AutoCommit && id > 0)) - { - _logger.LogInformation($"message [{message}] has been persisted in the database."); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - message.Id = id; - Enqueue(message); - } - } - catch (Exception e) - { - _logger.LogError(e, "An exception was occurred when publish message. message:" + name); - s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - throw; - } - } - - private int Execute(IClientSessionHandle session, CapPublishedMessage message) - { - message.Id = new MongoDBUtil().GetNextSequenceValue(_database, _options.PublishedCollection, session); - - var collection = _database.GetCollection(_options.PublishedCollection); - if (_usingTransaction) - { - collection.InsertOne(session, message); - } - else - { - collection.InsertOne(message); - } - - return message.Id; - } - - - private async Task PublishWithTransactionAsync(string name, T contentObj, IMongoTransaction transaction, string callbackName) - { - var operationId = default(Guid); - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - - var session = transaction.GetSession(); - - try - { - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - - var id = await ExecuteAsync(session, message); - - if (transaction.AutoCommit) - { - await session.CommitTransactionAsync(); - } - - if (!_usingTransaction || (transaction.AutoCommit && id > 0)) - { - _logger.LogInformation($"message [{message}] has been persisted in the database."); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - - message.Id = id; - - Enqueue(message); - } - } - catch (Exception e) - { - _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); - s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - Console.WriteLine(e); - throw; - } - } - - private async Task ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message) - { - message.Id = - await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session); - var collection = _database.GetCollection(_options.PublishedCollection); - if (_usingTransaction) - { - await collection.InsertOneAsync(session, message); - } - else - { - await collection.InsertOneAsync(message); - } - - return message.Id; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs b/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs deleted file mode 100644 index 6b19878..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor; -using Microsoft.Extensions.Logging; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBCollectProcessor : ICollectProcessor - { - private readonly IMongoDatabase _database; - private readonly ILogger _logger; - private readonly MongoDBOptions _options; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - - public MongoDBCollectProcessor(ILogger logger, - MongoDBOptions options, - IMongoClient client) - { - _options = options; - _logger = logger; - _database = client.GetDatabase(_options.DatabaseName); - } - - public async Task ProcessAsync(ProcessingContext context) - { - _logger.LogDebug( - $"Collecting expired data from collection [{_options.PublishedCollection}]."); - - var publishedCollection = _database.GetCollection(_options.PublishedCollection); - var receivedCollection = _database.GetCollection(_options.ReceivedCollection); - - await publishedCollection.BulkWriteAsync(new[] - { - new DeleteManyModel( - Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) - }); - await receivedCollection.BulkWriteAsync(new[] - { - new DeleteManyModel( - Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) - }); - - await context.WaitAsync(_waitingInterval); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs b/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs deleted file mode 100644 index 0553b1b..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using DotNetCore.CAP.Dashboard; -using DotNetCore.CAP.Dashboard.Monitoring; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Models; -using MongoDB.Bson; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoDBMonitoringApi : IMonitoringApi - { - private readonly IMongoDatabase _database; - private readonly MongoDBOptions _options; - - public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options) - { - var mongoClient = client ?? throw new ArgumentNullException(nameof(client)); - _options = options ?? throw new ArgumentNullException(nameof(options)); - - _database = mongoClient.GetDatabase(_options.DatabaseName); - } - - public StatisticsDto GetStatistics() - { - var publishedCollection = _database.GetCollection(_options.PublishedCollection); - var receivedCollection = _database.GetCollection(_options.ReceivedCollection); - - var statistics = new StatisticsDto(); - - { - if (int.TryParse( - publishedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), - out var count)) - { - statistics.PublishedSucceeded = count; - } - } - { - if (int.TryParse(publishedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), - out var count)) - { - statistics.PublishedFailed = count; - } - } - { - if (int.TryParse( - receivedCollection.CountDocuments(x => x.StatusName == StatusName.Succeeded).ToString(), - out var count)) - { - statistics.ReceivedSucceeded = count; - } - } - { - if (int.TryParse(receivedCollection.CountDocuments(x => x.StatusName == StatusName.Failed).ToString(), - out var count)) - { - statistics.ReceivedFailed = count; - } - } - - return statistics; - } - - public IDictionary HourlyFailedJobs(MessageType type) - { - return GetHourlyTimelineStats(type, StatusName.Failed); - } - - public IDictionary HourlySucceededJobs(MessageType type) - { - return GetHourlyTimelineStats(type, StatusName.Succeeded); - } - - public IList Messages(MessageQueryDto queryDto) - { - queryDto.StatusName = StatusName.Standardized(queryDto.StatusName); - - var name = queryDto.MessageType == MessageType.Publish - ? _options.PublishedCollection - : _options.ReceivedCollection; - var collection = _database.GetCollection(name); - - var builder = Builders.Filter; - var filter = builder.Empty; - if (!string.IsNullOrEmpty(queryDto.StatusName)) - { - filter = filter & builder.Eq(x => x.StatusName, queryDto.StatusName); - } - - if (!string.IsNullOrEmpty(queryDto.Name)) - { - filter = filter & builder.Eq(x => x.Name, queryDto.Name); - } - - if (!string.IsNullOrEmpty(queryDto.Group)) - { - filter = filter & builder.Eq(x => x.Group, queryDto.Group); - } - - if (!string.IsNullOrEmpty(queryDto.Content)) - { - filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*"); - } - - var result = collection - .Find(filter) - .SortByDescending(x => x.Added) - .Skip(queryDto.PageSize * queryDto.CurrentPage) - .Limit(queryDto.PageSize) - .ToList(); - - return result; - } - - public int PublishedFailedCount() - { - return GetNumberOfMessage(_options.PublishedCollection, StatusName.Failed); - } - - public int PublishedSucceededCount() - { - return GetNumberOfMessage(_options.PublishedCollection, StatusName.Succeeded); - } - - public int ReceivedFailedCount() - { - return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Failed); - } - - public int ReceivedSucceededCount() - { - return GetNumberOfMessage(_options.ReceivedCollection, StatusName.Succeeded); - } - - private int GetNumberOfMessage(string collectionName, string statusName) - { - var collection = _database.GetCollection(collectionName); - var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}}); - return int.Parse(count.ToString()); - } - - private IDictionary GetHourlyTimelineStats(MessageType type, string statusName) - { - var collectionName = - type == MessageType.Publish ? _options.PublishedCollection : _options.ReceivedCollection; - var endDate = DateTime.UtcNow; - - var groupby = new BsonDocument - { - { - "$group", new BsonDocument - { - { - "_id", new BsonDocument - { - { - "Key", new BsonDocument - { - { - "$dateToString", new BsonDocument - { - {"format", "%Y-%m-%d %H:00:00"}, - {"date", "$Added"} - } - } - } - } - } - }, - {"Count", new BsonDocument {{"$sum", 1}}} - } - } - }; - - var match = new BsonDocument - { - { - "$match", new BsonDocument - { - { - "Added", new BsonDocument - { - {"$gt", endDate.AddHours(-24)} - } - }, - { - "StatusName", - new BsonDocument - { - {"$eq", statusName} - } - } - } - } - }; - - var pipeline = new[] {match, groupby}; - - var collection = _database.GetCollection(collectionName); - var result = collection.Aggregate(pipeline).ToList(); - - var dic = new Dictionary(); - for (var i = 0; i < 24; i++) - { - dic.Add(DateTime.Parse(endDate.ToLocalTime().ToString("yyyy-MM-dd HH:00:00")), 0); - endDate = endDate.AddHours(-1); - } - - result.ForEach(d => - { - var key = d["_id"].AsBsonDocument["Key"].AsString; - if (DateTime.TryParse(key, out var dateTime)) - { - dic[dateTime.ToLocalTime()] = d["Count"].AsInt32; - } - }); - - return dic; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs b/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs deleted file mode 100644 index 3d3c1f7..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using MongoDB.Bson; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - internal class MongoDBUtil - { - private readonly FindOneAndUpdateOptions _options = new FindOneAndUpdateOptions - { - ReturnDocument = ReturnDocument.After - }; - - public async Task GetNextSequenceValueAsync(IMongoDatabase database, string collectionName, - IClientSessionHandle session = null) - { - //https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm - var collection = database.GetCollection(MongoDBOptions.CounterCollection); - - var updateDef = Builders.Update.Inc("sequence_value", 1); - var filter = new BsonDocument { { "_id", collectionName } }; - - BsonDocument result; - if (session == null) - { - result = await collection.FindOneAndUpdateAsync(filter, updateDef, _options); - } - else - { - result = await collection.FindOneAndUpdateAsync(session, filter, updateDef, _options); - } - - if (result.TryGetValue("sequence_value", out var value)) - { - return value.ToInt32(); - } - - throw new Exception("Unable to get next sequence value."); - } - - public int GetNextSequenceValue(IMongoDatabase database, string collectionName, - IClientSessionHandle session = null) - { - var collection = database.GetCollection(MongoDBOptions.CounterCollection); - - var filter = new BsonDocument { { "_id", collectionName } }; - var updateDef = Builders.Update.Inc("sequence_value", 1); - - var result = session == null - ? collection.FindOneAndUpdate(filter, updateDef, _options) - : collection.FindOneAndUpdate(session, filter, updateDef, _options); - - if (result.TryGetValue("sequence_value", out var value)) - { - return value.ToInt32(); - } - - throw new Exception("Unable to get next sequence value."); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs b/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs deleted file mode 100644 index 96aed37..0000000 --- a/src/DotNetCore.CAP.MongoDB/MongoTransaction.cs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; -using MongoDB.Driver; - -namespace DotNetCore.CAP.MongoDB -{ - public class MongoTransaction : IMongoTransaction - { - private readonly IMongoClient _client; - - public MongoTransaction(IMongoClient client) - { - _client = client; - } - - public IClientSessionHandle Session { get; set; } - public bool AutoCommit { get; set; } - - public async Task BegeinAsync(bool autoCommit = true) - { - AutoCommit = autoCommit; - Session = await _client.StartSessionAsync(); - Session.StartTransaction(); - return this; - } - - public IMongoTransaction Begein(bool autoCommit = true) - { - AutoCommit = autoCommit; - Session = _client.StartSession(); - Session.StartTransaction(); - return this; - } - - public void Dispose() - { - Session?.Dispose(); - } - } - - public class NullMongoTransaction : MongoTransaction - { - public NullMongoTransaction(IMongoClient client = null) : base(client) - { - AutoCommit = false; - } - } - - public static class MongoTransactionExtensions - { - public static IClientSessionHandle GetSession(this IMongoTransaction mongoTransaction) - { - var trans = mongoTransaction as MongoTransaction; - return trans?.Session; - } - } -} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs deleted file mode 100644 index 45695f9..0000000 --- a/test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System.Collections.Concurrent; -using System.Threading.Tasks; -using FluentAssertions; -using Xunit; - -namespace DotNetCore.CAP.MongoDB.Test -{ - [Collection("MongoDB")] - public class MongoDBUtilTest : DatabaseTestHost - { - [Fact] - public async void GetNextSequenceValueAsync_Test() - { - var id = await new MongoDBUtil().GetNextSequenceValueAsync(Database, MongoDBOptions.ReceivedCollection); - id.Should().BeGreaterThan(0); - } - - [Fact] - public void GetNextSequenceValue_Concurrency_Test() - { - var dic = new ConcurrentDictionary(); - Parallel.For(0, 30, (x) => - { - var id = new MongoDBUtil().GetNextSequenceValue(Database, MongoDBOptions.ReceivedCollection); - id.Should().BeGreaterThan(0); - dic.TryAdd(id, x).Should().BeTrue(); //The id shouldn't be same. - }); - } - } -} \ No newline at end of file