diff --git a/.travis.yml b/.travis.yml index ba8c1fe..6582e7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: csharp sudo: required dist: trusty solution: CAP.sln -dotnet: 2.1.300 +dotnet: 2.2.100 mono: none matrix: diff --git a/CAP.sln b/CAP.sln index 13eac1f..93f9a15 100644 --- a/CAP.sln +++ b/CAP.sln @@ -100,7 +100,6 @@ Global {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.ActiveCfg = Release|Any CPU {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.Build.0 = Release|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU {82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -120,11 +119,9 @@ Global {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU {77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.Build.0 = Debug|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection diff --git a/build.sh b/build.sh index e70f996..4034242 100644 --- a/build.sh +++ b/build.sh @@ -1,3 +1,3 @@ dotnet --info dotnet restore -dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0 \ No newline at end of file +dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.2 \ No newline at end of file diff --git a/build/version.props b/build/version.props index e9ad11c..91c600c 100644 --- a/build/version.props +++ b/build/version.props @@ -1,8 +1,8 @@ 2 - 3 - 1 + 4 + 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj index e64e18b..bec0935 100644 --- a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj +++ b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1 + netcoreapp2.2 Sample.Kafka.MySql NU1701 NU1701 diff --git a/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj b/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj index b693e95..eebde0c 100644 --- a/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj +++ b/samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1 + netcoreapp2.2 @@ -13,4 +13,4 @@ - + \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj index 4ebfbda..5e3b6b5 100644 --- a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj +++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj @@ -1,12 +1,12 @@  - netcoreapp2.1 + netcoreapp2.2 - + diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs index 1c049f0..52c12c9 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs @@ -4,6 +4,8 @@ using System; using DotNetCore.CAP.Processor; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using MongoDB.Driver; namespace DotNetCore.CAP.MongoDB { @@ -32,6 +34,9 @@ namespace DotNetCore.CAP.MongoDB var options = new MongoDBOptions(); _configure?.Invoke(options); services.AddSingleton(options); + + //Try to add IMongoClient if does not exists + services.TryAddSingleton(new MongoClient(options.DatabaseConnection)); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs index 0cb350e..f753cb7 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs @@ -29,5 +29,7 @@ namespace DotNetCore.CAP.MongoDB /// Default value: "published" /// public string PublishedCollection { get; set; } = "cap.published"; + + internal string Version { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs index eb9b8df..bba7b0b 100644 --- a/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs @@ -27,6 +27,8 @@ namespace Microsoft.Extensions.DependencyInjection throw new ArgumentNullException(nameof(configure)); } + configure += x => x.Version = options.Version; + options.RegisterExtension(new MongoDBCapOptionsExtension(configure)); return options; diff --git a/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj index 1ae2980..4379ea2 100644 --- a/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj +++ b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj @@ -16,9 +16,8 @@ - - - + + diff --git a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs index 5c4f6cc..d43e868 100644 --- a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs @@ -35,15 +35,28 @@ namespace DotNetCore.CAP.MongoDB var collection = _client .GetDatabase(_options.DatabaseName) - .GetCollection(_options.PublishedCollection); + .GetCollection(_options.PublishedCollection); + + var store = new PublishedMessage() + { + Id = message.Id, + Name = message.Name, + Content = message.Content, + Added = message.Added, + StatusName = message.StatusName, + ExpiresAt = message.ExpiresAt, + Retries = message.Retries, + Version = _options.Version, + }; if (NotUseTransaction) { - return collection.InsertOneAsync(message, insertOptions, cancel); + + return collection.InsertOneAsync(store, insertOptions, cancel); } var dbTrans = (IClientSessionHandle) transaction.DbTransaction; - return collection.InsertOneAsync(dbTrans, message, insertOptions, cancel); + return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs index a58627b..eaad75a 100644 --- a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs @@ -27,9 +27,9 @@ namespace DotNetCore.CAP.MongoDB public bool ChangePublishedState(long messageId, string state) { - var collection = _database.GetCollection(_options.PublishedCollection); + var collection = _database.GetCollection(_options.PublishedCollection); - var updateDef = Builders + var updateDef = Builders .Update.Inc(x => x.Retries, 1) .Set(x => x.ExpiresAt, null) .Set(x => x.StatusName, state); @@ -42,9 +42,9 @@ namespace DotNetCore.CAP.MongoDB public bool ChangeReceivedState(long messageId, string state) { - var collection = _database.GetCollection(_options.ReceivedCollection); + var collection = _database.GetCollection(_options.ReceivedCollection); - var updateDef = Builders + var updateDef = Builders .Update.Inc(x => x.Retries, 1) .Set(x => x.ExpiresAt, null) .Set(x => x.StatusName, state); @@ -62,35 +62,39 @@ namespace DotNetCore.CAP.MongoDB public async Task GetPublishedMessageAsync(long id) { - var collection = _database.GetCollection(_options.PublishedCollection); + var collection = _database.GetCollection(_options.PublishedCollection); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); } public async Task> GetPublishedMessagesOfNeedRetry() { var fourMinsAgo = DateTime.Now.AddMinutes(-4); - var collection = _database.GetCollection(_options.PublishedCollection); + var collection = _database.GetCollection(_options.PublishedCollection); return await collection - .Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && - (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) + .Find(x => x.Retries < _capOptions.FailedRetryCount + && x.Added < fourMinsAgo + && x.Version == _capOptions.Version + && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) .Limit(200) .ToListAsync(); } public async Task GetReceivedMessageAsync(long id) { - var collection = _database.GetCollection(_options.ReceivedCollection); + var collection = _database.GetCollection(_options.ReceivedCollection); return await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); } public async Task> GetReceivedMessagesOfNeedRetry() { var fourMinsAgo = DateTime.Now.AddMinutes(-4); - var collection = _database.GetCollection(_options.ReceivedCollection); + var collection = _database.GetCollection(_options.ReceivedCollection); return await collection - .Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && - (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) + .Find(x => x.Retries < _capOptions.FailedRetryCount + && x.Added < fourMinsAgo + && x.Version == _capOptions.Version + && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)) .Limit(200) .ToListAsync(); } @@ -101,10 +105,22 @@ namespace DotNetCore.CAP.MongoDB { throw new ArgumentNullException(nameof(message)); } + var collection = _database.GetCollection(_options.ReceivedCollection); - var collection = _database.GetCollection(_options.ReceivedCollection); - - collection.InsertOne(message); + var store = new ReceivedMessage() + { + Id = message.Id, + Group = message.Group, + Name = message.Name, + Content = message.Content, + Added = message.Added, + StatusName = message.StatusName, + ExpiresAt = message.ExpiresAt, + Retries = message.Retries, + Version = _capOptions.Version + }; + + collection.InsertOne(store); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MongoDB/StorageMessage.cs b/src/DotNetCore.CAP.MongoDB/StorageMessage.cs new file mode 100644 index 0000000..b5abafa --- /dev/null +++ b/src/DotNetCore.CAP.MongoDB/StorageMessage.cs @@ -0,0 +1,14 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.MongoDB +{ + internal class ReceivedMessage : CapReceivedMessage + { + public string Version { get; set; } + } + + internal class PublishedMessage : CapPublishedMessage + { + public string Version { get; set; } + } +} diff --git a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs index c2b9983..692548c 100644 --- a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs @@ -19,5 +19,10 @@ namespace DotNetCore.CAP /// EF db context type. /// internal Type DbContextType { get; set; } + + /// + /// Data version + /// + internal string Version { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs index f59b75d..15f76b4 100644 --- a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs @@ -22,6 +22,7 @@ namespace Microsoft.Extensions.DependencyInjection throw new ArgumentNullException(nameof(configure)); } + configure += x => x.Version = options.Version; options.RegisterExtension(new MySqlCapOptionsExtension(configure)); @@ -46,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection { configure(x); x.DbContextType = typeof(TContext); + x.Version = options.Version; })); return options; diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj index ac716ba..41d66f9 100644 --- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj +++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj @@ -13,9 +13,9 @@ - - - + + + diff --git a/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs b/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs index 8167409..f22c82a 100644 --- a/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs +++ b/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs @@ -55,7 +55,8 @@ namespace DotNetCore.CAP.MySql private string PrepareSql() { return - $"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + $"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)" + + $"VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } #endregion private methods diff --git a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IStorage.MySql.cs index 7ade288..2b32b29 100644 --- a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IStorage.MySql.cs @@ -48,7 +48,7 @@ namespace DotNetCore.CAP.MySql var sql = CreateDbTablesScript(_options.TableNamePrefix); using (var connection = new MySqlConnection(_options.ConnectionString)) { - await connection.ExecuteAsync(sql); + await connection.ExecuteAsync(sql); } _logger.LogDebug("Ensuring all create database tables script are applied."); @@ -60,6 +60,7 @@ namespace DotNetCore.CAP.MySql $@" CREATE TABLE IF NOT EXISTS `{prefix}.received` ( `Id` bigint NOT NULL, + `Version` varchar(20) DEFAULT NULL, `Name` varchar(400) NOT NULL, `Group` varchar(200) DEFAULT NULL, `Content` longtext, @@ -72,6 +73,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` ( CREATE TABLE IF NOT EXISTS `{prefix}.published` ( `Id` bigint NOT NULL, + `Version` varchar(20) DEFAULT NULL, `Name` varchar(200) NOT NULL, `Content` longtext, `Retries` int(11) DEFAULT NULL, diff --git a/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs b/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs index 9401136..47d2fae 100644 --- a/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs @@ -44,7 +44,7 @@ namespace DotNetCore.CAP.MySql { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new MySqlConnection(Options.ConnectionString)) { @@ -60,8 +60,8 @@ namespace DotNetCore.CAP.MySql } var sql = $@" -INSERT INTO `{_prefix}.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) -VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; +INSERT INTO `{_prefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) +VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; using (var connection = new MySqlConnection(Options.ConnectionString)) { @@ -82,7 +82,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new MySqlConnection(Options.ConnectionString)) { return await connection.QueryAsync(sql); diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs index 883d297..5fa9d8b 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs @@ -17,5 +17,10 @@ namespace DotNetCore.CAP public string Schema { get; set; } = DefaultSchema; internal Type DbContextType { get; set; } + + /// + /// Data version + /// + internal string Version { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs index 08ffb90..0f41f1c 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs @@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection throw new ArgumentNullException(nameof(configure)); } + configure += x => x.Version = options.Version; + options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure)); return options; @@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection options.RegisterExtension(new PostgreSqlCapOptionsExtension(x => { configure(x); + x.Version = options.Version; x.DbContextType = typeof(TContext); })); diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj index fc854c0..744391e 100644 --- a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj +++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj @@ -13,8 +13,8 @@ - - + + diff --git a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs index 6cb5e65..bba6dad 100644 --- a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs @@ -55,7 +55,7 @@ namespace DotNetCore.CAP.PostgreSql private string PrepareSql() { return - $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Version,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } private IDbConnection InitDbConnection() diff --git a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs index baa08e7..187fac2 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs @@ -102,6 +102,7 @@ CREATE SCHEMA IF NOT EXISTS ""{schema}""; CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( ""Id"" BIGINT PRIMARY KEY NOT NULL, + ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL, ""Group"" VARCHAR(200) NULL, ""Content"" TEXT NULL, @@ -113,13 +114,18 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( ""Id"" BIGINT PRIMARY KEY NOT NULL, + ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL, ""Content"" TEXT NULL, ""Retries"" INT NOT NULL, ""Added"" TIMESTAMP NOT NULL, ""ExpiresAt"" TIMESTAMP NULL, ""StatusName"" VARCHAR(50) NOT NULL -);"; +); + +ALTER TABLE ""{schema}"".""received"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL; +ALTER TABLE ""{schema}"".""published"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL; +"; return batchSql; } } diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs index 4190383..f6f6b87 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs @@ -42,7 +42,7 @@ namespace DotNetCore.CAP.PostgreSql { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) { @@ -58,7 +58,7 @@ namespace DotNetCore.CAP.PostgreSql } var sql = - $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; + $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) { @@ -79,7 +79,7 @@ namespace DotNetCore.CAP.PostgreSql { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) { return await connection.QueryAsync(sql); @@ -107,9 +107,5 @@ namespace DotNetCore.CAP.PostgreSql return connection.Execute(sql) > 0; } } - - public void Dispose() - { - } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs index b50c9dc..a1fce45 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs @@ -22,7 +22,9 @@ namespace DotNetCore.CAP.RabbitMQ private int _count; private int _maxSize; - public ConnectionChannelPool(ILogger logger, RabbitMQOptions options) + public ConnectionChannelPool(ILogger logger, + CapOptions capOptions, + RabbitMQOptions options) { _logger = logger; _maxSize = DefaultPoolSize; @@ -30,10 +32,17 @@ namespace DotNetCore.CAP.RabbitMQ _connectionActivator = CreateConnection(options); HostAddress = options.HostName + ":" + options.Port; - Exchange = options.ExchangeName; - _logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", - JsonConvert.SerializeObject(options, Formatting.Indented)); + if (CapOptions.DefaultVersion == capOptions.Version) + { + Exchange = options.ExchangeName; + } + else + { + Exchange = options.ExchangeName + "." + capOptions.Version; + } + + _logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(options, Formatting.Indented)); } IModel IConnectionChannelPool.Rent() diff --git a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs index 3dacba3..185ec3b 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs @@ -8,6 +8,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; using RabbitMQ.Client; +using RabbitMQ.Client.Framing; namespace DotNetCore.CAP.RabbitMQ { @@ -33,8 +34,13 @@ namespace DotNetCore.CAP.RabbitMQ try { var body = Encoding.UTF8.GetBytes(content); + var props = new BasicProperties() + { + DeliveryMode = 2 + }; + channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); - channel.BasicPublish(_exchange, keyName, null, body); + channel.BasicPublish(_exchange, keyName, props, body); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 972b6dd..99dafb3 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RabbitMQ _queueName = queueName; _connectionChannelPool = connectionChannelPool; _rabbitMQOptions = options; - _exchageName = options.ExchangeName; + _exchageName = connectionChannelPool.Exchange; InitClient(); } diff --git a/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs b/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs index 3cf44c4..c8399c3 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs @@ -23,6 +23,11 @@ namespace DotNetCore.CAP internal bool IsSqlServer2008 { get; set; } + /// + /// Data version + /// + internal string Version { get; set; } + public EFOptions UseSqlServer2008() { IsSqlServer2008 = true; diff --git a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs index 6aca9cc..75af7aa 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs @@ -22,6 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection throw new ArgumentNullException(nameof(configure)); } + configure += x => x.Version = options.Version; + options.RegisterExtension(new SqlServerCapOptionsExtension(configure)); return options; @@ -44,6 +46,7 @@ namespace Microsoft.Extensions.DependencyInjection options.RegisterExtension(new SqlServerCapOptionsExtension(x => { configure(x); + x.Version = options.Version; x.DbContextType = typeof(TContext); })); diff --git a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj index 27d3ca2..e7ea6cb 100644 --- a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj +++ b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj @@ -14,9 +14,9 @@ - - - + + + diff --git a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs index 8a61c80..19de84f 100644 --- a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs @@ -55,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer private string PrepareSql() { return - $"INSERT INTO {_options.Schema}.[Published] ([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + $"INSERT INTO {_options.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } #endregion private methods diff --git a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs index abc19d4..435301a 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs @@ -75,6 +75,7 @@ IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL BEGIN CREATE TABLE [{schema}].[Received]( [Id] [bigint] NOT NULL, + [Version] [nvarchar](20) NOT NULL, [Name] [nvarchar](200) NOT NULL, [Group] [nvarchar](200) NULL, [Content] [nvarchar](max) NULL, @@ -93,6 +94,7 @@ IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL BEGIN CREATE TABLE [{schema}].[Published]( [Id] [bigint] NOT NULL, + [Version] [nvarchar](20) NOT NULL, [Name] [nvarchar](200) NOT NULL, [Content] [nvarchar](max) NULL, [Retries] [int] NOT NULL, diff --git a/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs index b938d8b..c7b0346 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs @@ -42,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; + $"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; using (var connection = new SqlConnection(Options.ConnectionString)) { @@ -58,8 +58,8 @@ namespace DotNetCore.CAP.SqlServer } var sql = $@" -INSERT INTO [{Options.Schema}].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) -VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; +INSERT INTO [{Options.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) +VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; using (var connection = new SqlConnection(Options.ConnectionString)) { @@ -80,7 +80,7 @@ VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; { var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; + $"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryAsync(sql); diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 05c0294..c44be34 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -28,6 +28,11 @@ namespace DotNetCore.CAP /// public const int DefaultFailedRetryCount = 50; + /// + /// Default version + /// + public const string DefaultVersion = "v1"; + public CapOptions() { @@ -35,6 +40,7 @@ namespace DotNetCore.CAP FailedRetryInterval = DefaultFailedMessageWaitingInterval; FailedRetryCount = DefaultFailedRetryCount; Extensions = new List(); + Version = DefaultVersion; DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly().GetName().Name.ToLower(); } @@ -45,6 +51,11 @@ namespace DotNetCore.CAP /// public string DefaultGroup { get; set; } + /// + /// The default version of the message, configured to isolate data in the same instance. The length must not exceed 20 + /// + public string Version { get; set; } + /// /// Sent or received succeed message after time span of due, then the message will be deleted at due time. /// Default is 24*3600 seconds. diff --git a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.Designer.cs b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.Designer.cs index c08a9ab..8dcc775 100644 --- a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.Designer.cs +++ b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.Designer.cs @@ -185,6 +185,15 @@ namespace DotNetCore.CAP.Dashboard.Resources { return ResourceManager.GetString("Common_Id", resourceCulture); } } + + /// + /// Looks up a localized string similar to Version. + /// + public static string Common_Version { + get { + return ResourceManager.GetString("Common_Version", resourceCulture); + } + } /// /// Looks up a localized string similar to Less details.... @@ -321,24 +330,6 @@ namespace DotNetCore.CAP.Dashboard.Resources { } } - /// - /// Looks up a localized string similar to The queue is empty.. - /// - public static string EnqueuedJobsPage_NoJobs { - get { - return ResourceManager.GetString("EnqueuedJobsPage_NoJobs", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Enqueued jobs. - /// - public static string EnqueuedJobsPage_Title { - get { - return ResourceManager.GetString("EnqueuedJobsPage_Title", resourceCulture); - } - } - /// /// Looks up a localized string similar to Publish Failed. /// @@ -816,60 +807,6 @@ namespace DotNetCore.CAP.Dashboard.Resources { } } - /// - /// Looks up a localized string similar to Heartbeat. - /// - public static string ServersPage_Table_Heartbeat { - get { - return ResourceManager.GetString("ServersPage_Table_Heartbeat", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Name. - /// - public static string ServersPage_Table_Name { - get { - return ResourceManager.GetString("ServersPage_Table_Name", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Queues. - /// - public static string ServersPage_Table_Queues { - get { - return ResourceManager.GetString("ServersPage_Table_Queues", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Started. - /// - public static string ServersPage_Table_Started { - get { - return ResourceManager.GetString("ServersPage_Table_Started", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Workers. - /// - public static string ServersPage_Table_Workers { - get { - return ResourceManager.GetString("ServersPage_Table_Workers", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Servers. - /// - public static string ServersPage_Title { - get { - return ResourceManager.GetString("ServersPage_Title", resourceCulture); - } - } - /// /// Looks up a localized string similar to Failed. /// diff --git a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.resx b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.resx index 57ed89d..b04cb91 100644 --- a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.resx +++ b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.resx @@ -123,18 +123,12 @@ Delete - - Do you really want to DELETE ALL selected jobs? - Deleting... Delete selected - - Enqueue jobs - Enqueueing... @@ -144,6 +138,9 @@ Id + + Version + Less details... @@ -177,12 +174,6 @@ Unknown - - The queue is empty. - - - Enqueued jobs - 24h graph @@ -215,25 +206,7 @@ There are no config distributed node discory. - - - Heartbeat - - - Name - - - Queues - - - Started - - - Workers - - - Servers - + Published Messages @@ -267,9 +240,6 @@ Active Connections - - Deleted Jobs - Retries @@ -295,7 +265,7 @@ Enqueued / Queues - {0} failed job(s) found. Retry or delete them manually. + {0} failed message(s) found. Publish Failed @@ -340,7 +310,7 @@ No messages found. - Published Jobs + Published messages Message group diff --git a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.zh.resx b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.zh.resx index 90ed01e..808df11 100644 --- a/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.zh.resx +++ b/src/DotNetCore.CAP/Dashboard/Content/resx/Strings.zh.resx @@ -144,6 +144,9 @@ 编号 + + 版本 + 收起... @@ -174,12 +177,6 @@ 未知 - - 没有任何作业 - - - 队列作业 - 当日走势 @@ -209,25 +206,7 @@ 每页条数 - - - 心跳 - - - 名称 - - - 队列 - - - 执行 - - - 工作区 - - - 服务器 - + 发送出的消息 @@ -271,7 +250,7 @@ 队列 - {0} failed job(s) found. Retry or delete them manually. + 发现了 {0} 个失败的消息 发送失败 diff --git a/src/DotNetCore.CAP/Dashboard/JsonDispatcher.cs b/src/DotNetCore.CAP/Dashboard/JsonDispatcher.cs index 89eadea..dc6df96 100644 --- a/src/DotNetCore.CAP/Dashboard/JsonDispatcher.cs +++ b/src/DotNetCore.CAP/Dashboard/JsonDispatcher.cs @@ -34,7 +34,13 @@ namespace DotNetCore.CAP.Dashboard var settings = new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver(), - Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}} + Converters = new JsonConverter[] + { + new StringEnumConverter + { + NamingStrategy = new CamelCaseNamingStrategy() + } + } }; serialized = JsonConvert.SerializeObject(result, settings); } diff --git a/src/DotNetCore.CAP/Dashboard/JsonStats.cs b/src/DotNetCore.CAP/Dashboard/JsonStats.cs index 4b3db7b..126fc42 100644 --- a/src/DotNetCore.CAP/Dashboard/JsonStats.cs +++ b/src/DotNetCore.CAP/Dashboard/JsonStats.cs @@ -30,7 +30,13 @@ namespace DotNetCore.CAP.Dashboard var settings = new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver(), - Converters = new JsonConverter[] {new StringEnumConverter {CamelCaseText = true}} + Converters = new JsonConverter[] + { + new StringEnumConverter + { + NamingStrategy = new CamelCaseNamingStrategy() + } + } }; var serialized = JsonConvert.SerializeObject(result, settings); diff --git a/src/DotNetCore.CAP/Dashboard/Monitoring/MessageDto.cs b/src/DotNetCore.CAP/Dashboard/Monitoring/MessageDto.cs index 0baaec4..233c5b2 100644 --- a/src/DotNetCore.CAP/Dashboard/Monitoring/MessageDto.cs +++ b/src/DotNetCore.CAP/Dashboard/Monitoring/MessageDto.cs @@ -9,6 +9,8 @@ namespace DotNetCore.CAP.Dashboard.Monitoring { public long Id { get; set; } + public string Version { get; set; } + public string Group { get; set; } public string Name { get; set; } diff --git a/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.cshtml b/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.cshtml index 1e7cdfa..d71b8a6 100644 --- a/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.cshtml +++ b/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.cshtml @@ -80,7 +80,8 @@ - @Strings.MessagesPage_Table_Code + @Strings.Common_Id + @Strings.Common_Version @Strings.MessagesPage_Table_Name @Strings.MessagesPage_Table_Retries @if (string.Equals(StatusName, "Processing", StringComparison.CurrentCultureIgnoreCase)) @@ -95,11 +96,14 @@ { - + #@message.Id + + @message.Version + @message.Name diff --git a/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.generated.cs b/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.generated.cs index 9514d49..f93fa89 100644 --- a/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.generated.cs +++ b/src/DotNetCore.CAP/Dashboard/Pages/PublishedPage.generated.cs @@ -267,19 +267,25 @@ namespace DotNetCore.CAP.Dashboard.Pages - "); + "); #line 83 "..\..\PublishedPage.cshtml" - Write(Strings.MessagesPage_Table_Code); + Write(Strings.Common_Id); #line default #line hidden WriteLiteral("\r\n "); +#line 84 "..\..\PublishedPage.cshtml" + Write(Strings.Common_Version); + +#line default +#line hidden + WriteLiteral("\r\n "); #line 84 "..\..\PublishedPage.cshtml" Write(Strings.MessagesPage_Table_Name); @@ -387,7 +393,14 @@ namespace DotNetCore.CAP.Dashboard.Pages " "); +#line 102 "..\..\PublishedPage.cshtml" + Write(message.Version); + +#line default +#line hidden + WriteLiteral("\r\n \r\n \r\n " + + " "); #line 104 "..\..\PublishedPage.cshtml" Write(message.Name); diff --git a/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.cshtml b/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.cshtml index 7ec28b3..01dcaad 100644 --- a/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.cshtml +++ b/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.cshtml @@ -85,7 +85,8 @@ - @Strings.MessagesPage_Table_Code + @Strings.Common_Id + @Strings.Common_Version @Strings.MessagesPage_Table_Group @Strings.MessagesPage_Table_Name @Strings.MessagesPage_Table_Retries @@ -106,6 +107,9 @@ #@message.Id + + @message.Version + @message.Group diff --git a/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.generated.cs b/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.generated.cs index 77e480e..8f34929 100644 --- a/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.generated.cs +++ b/src/DotNetCore.CAP/Dashboard/Pages/ReceivedPage.generated.cs @@ -297,7 +297,15 @@ namespace DotNetCore.CAP.Dashboard.Pages #line 88 "..\..\ReceivedPage.cshtml" - Write(Strings.MessagesPage_Table_Code); + Write(Strings.Common_Id); + + +#line default +#line hidden + WriteLiteral("\r\n "); + +#line 88 "..\..\ReceivedPage.cshtml" + Write(Strings.Common_Version); #line default @@ -424,6 +432,15 @@ namespace DotNetCore.CAP.Dashboard.Pages +#line 111 "..\..\ReceivedPage.cshtml" + Write(message.Version); + +#line default +#line hidden + WriteLiteral("\r\n \r\n " + + "\r\n "); + + #line 110 "..\..\ReceivedPage.cshtml" Write(message.Group); diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index 0866cca..d831842 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -32,12 +32,12 @@ - - - - - - + + + + + + diff --git a/src/DotNetCore.CAP/ISubscriberExecutor.cs b/src/DotNetCore.CAP/ISubscriberExecutor.cs index a099b60..2222f78 100644 --- a/src/DotNetCore.CAP/ISubscriberExecutor.cs +++ b/src/DotNetCore.CAP/ISubscriberExecutor.cs @@ -6,6 +6,9 @@ using DotNetCore.CAP.Models; namespace DotNetCore.CAP { + /// + /// Consumer execotor + /// public interface ISubscriberExecutor { Task ExecuteAsync(CapReceivedMessage message); diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index f751499..47b4be6 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -9,6 +9,7 @@ using System.Text.RegularExpressions; using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using Microsoft.Extensions.DependencyInjection; +using System.Collections.Concurrent; namespace DotNetCore.CAP.Internal { @@ -20,8 +21,12 @@ namespace DotNetCore.CAP.Internal { private readonly CapOptions _capOptions; private readonly IServiceProvider _serviceProvider; - private List> _asteriskList; - private List> _poundList; + + /// + /// since this class be designed as a Singleton service,the following two list must be thread safe!!! + /// + private readonly ConcurrentDictionary>> _asteriskList; + private readonly ConcurrentDictionary>> _poundList; /// /// Creates a new . @@ -30,6 +35,9 @@ namespace DotNetCore.CAP.Internal { _serviceProvider = serviceProvider; _capOptions = capOptions; + + _asteriskList = new ConcurrentDictionary>>(); + _poundList = new ConcurrentDictionary>>(); } public IReadOnlyList SelectCandidates() @@ -120,7 +128,11 @@ namespace DotNetCore.CAP.Internal { if (attr.Group == null) { - attr.Group = _capOptions.DefaultGroup; + attr.Group = _capOptions.DefaultGroup + "." + _capOptions.Version; + } + else + { + attr.Group = attr.Group + "." + _capOptions.Version; } yield return InitDescriptor(attr, method, typeInfo); @@ -150,17 +162,19 @@ namespace DotNetCore.CAP.Internal private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList executeDescriptor) { - if (_asteriskList == null) + var group = executeDescriptor.First().Attribute.Group; + if (!_asteriskList.TryGetValue(group, out var tmpList)) { - _asteriskList = executeDescriptor - .Where(x => x.Attribute.Name.IndexOf('*') >= 0) + tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0) .Select(x => new RegexExecuteDescriptor - { - Name = ("^" + x.Attribute.Name + "$").Replace("*", "[a-zA-Z]+").Replace(".", "\\."), - Descriptor = x - }).ToList(); + { + Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."), + Descriptor = x + }).ToList(); + _asteriskList.TryAdd(group, tmpList); } - foreach (var red in _asteriskList) + + foreach (var red in tmpList) { if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) { @@ -173,18 +187,20 @@ namespace DotNetCore.CAP.Internal private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList executeDescriptor) { - if (_poundList == null) + var group = executeDescriptor.First().Attribute.Group; + if (!_poundList.TryGetValue(group, out var tmpList)) { - _poundList = executeDescriptor + tmpList = executeDescriptor .Where(x => x.Attribute.Name.IndexOf('#') >= 0) .Select(x => new RegexExecuteDescriptor - { - Name = ("^" + x.Attribute.Name + "$").Replace("#", "[a-zA-Z\\.]+"), - Descriptor = x - }).ToList(); + { + Name = ("^" + x.Attribute.Name + "$").Replace("#", "[0-9_a-zA-Z\\.]+"), + Descriptor = x + }).ToList(); + _poundList.TryAdd(group, tmpList); } - foreach (var red in _poundList) + foreach (var red in tmpList) { if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) { diff --git a/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj b/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj index f758ee0..e15a979 100644 --- a/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj +++ b/test/DotNetCore.CAP.MongoDB.Test/DotNetCore.CAP.MongoDB.Test.csproj @@ -6,13 +6,13 @@ - - - - + + + + - - + + all runtime; build; native; contentfiles; analyzers diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs index cf5ed73..60a8ccc 100644 --- a/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs @@ -17,21 +17,23 @@ namespace DotNetCore.CAP.MongoDB.Test { _api = new MongoDBMonitoringApi(MongoClient, MongoDBOptions); - var collection = Database.GetCollection(MongoDBOptions.PublishedCollection); + var collection = Database.GetCollection(MongoDBOptions.PublishedCollection); collection.InsertMany(new[] { - new CapPublishedMessage + new PublishedMessage { Id = SnowflakeId.Default().NextId(), Added = DateTime.Now.AddHours(-1), StatusName = "Failed", + Version = "v1", Content = "abc" }, - new CapPublishedMessage + new PublishedMessage { Id = SnowflakeId.Default().NextId(), Added = DateTime.Now, StatusName = "Failed", + Version = "v1", Content = "bbc" } }); diff --git a/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs index 73fa5ba..c819d17 100644 --- a/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs @@ -24,9 +24,13 @@ namespace DotNetCore.CAP.MongoDB.Test Content = "test-content" }; - _connection.StoreReceivedMessage(new CapReceivedMessage(messageContext) + _connection.StoreReceivedMessage(new ReceivedMessage() { - Id = SnowflakeId.Default().NextId() + Id = SnowflakeId.Default().NextId(), + Group=messageContext.Group, + Content=messageContext.Content, + Name=messageContext.Name, + Version="v1" }); } @@ -34,7 +38,7 @@ namespace DotNetCore.CAP.MongoDB.Test public void ChangeReceivedState_Test() { StoreReceivedMessageAsync_TestAsync(); - var collection = Database.GetCollection(MongoDBOptions.ReceivedCollection); + var collection = Database.GetCollection(MongoDBOptions.ReceivedCollection); var msg = collection.Find(x => true).FirstOrDefault(); _connection.ChangeReceivedState(msg.Id, StatusName.Scheduled).Should().BeTrue(); @@ -60,9 +64,9 @@ namespace DotNetCore.CAP.MongoDB.Test }; _connection.StoreReceivedMessage(msg); - var collection = Database.GetCollection(MongoDBOptions.ReceivedCollection); + var collection = Database.GetCollection(MongoDBOptions.ReceivedCollection); - var updateDef = Builders + var updateDef = Builders .Update.Set(x => x.Added, DateTime.Now.AddMinutes(-5)); await collection.UpdateOneAsync(x => x.Id == id, updateDef); diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj index 6f6d37b..83ceaa9 100644 --- a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj +++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj @@ -13,20 +13,20 @@ - + all runtime; build; native; contentfiles; analyzers - - - - - - - - - - + + + + + + + + + + \ No newline at end of file diff --git a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs index 7db4a38..410b03f 100644 --- a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs @@ -22,7 +22,7 @@ namespace DotNetCore.CAP.MySql.Test [Fact] public async Task GetPublishedMessageAsync_Test() { - var sql = "INSERT INTO `cap.published`(`Id`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var sql = "INSERT INTO `cap.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var publishMessage = new CapPublishedMessage { @@ -69,8 +69,8 @@ namespace DotNetCore.CAP.MySql.Test public async Task GetReceivedMessageAsync_Test() { var sql = $@" - INSERT INTO `cap.received`(`Id`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) - VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + INSERT INTO `cap.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) + VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var receivedMessage = new CapReceivedMessage { diff --git a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj index f1bf2bd..08d441b 100644 --- a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj +++ b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj @@ -9,8 +9,8 @@ - - + + all runtime; build; native; contentfiles; analyzers diff --git a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs index 98734ba..bf882b4 100644 --- a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs @@ -22,7 +22,7 @@ namespace DotNetCore.CAP.PostgreSql.Test [Fact] public async Task GetPublishedMessageAsync_Test() { - var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Version"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var publishMessage = new CapPublishedMessage { @@ -69,8 +69,8 @@ namespace DotNetCore.CAP.PostgreSql.Test public async Task GetReceivedMessageAsync_Test() { var sql = $@" - INSERT INTO ""cap"".""received""(""Id"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") - VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + INSERT INTO ""cap"".""received""(""Id"",""Version"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") + VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var receivedMessage = new CapReceivedMessage { diff --git a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj index 5bdcd2f..bbafdd4 100644 --- a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj +++ b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj @@ -13,21 +13,21 @@ - - + + all runtime; build; native; contentfiles; analyzers - - - - - - - - - - + + + + + + + + + + diff --git a/test/DotNetCore.CAP.SqlServer.Test/SqlServerStorageConnectionTest.cs b/test/DotNetCore.CAP.SqlServer.Test/SqlServerStorageConnectionTest.cs index 20d2c69..0df9649 100644 --- a/test/DotNetCore.CAP.SqlServer.Test/SqlServerStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.SqlServer.Test/SqlServerStorageConnectionTest.cs @@ -20,7 +20,7 @@ namespace DotNetCore.CAP.SqlServer.Test [Fact] public async Task GetPublishedMessageAsync_Test() { - var sql = "INSERT INTO [Cap].[Published]([Id],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var sql = "INSERT INTO [Cap].[Published]([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var publishMessage = new CapPublishedMessage { @@ -67,7 +67,7 @@ namespace DotNetCore.CAP.SqlServer.Test [Fact] public async Task GetReceivedMessageAsync_Test() { - var sql = @"INSERT INTO [Cap].[Received]([Id],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var sql = @"INSERT INTO [Cap].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var insertedId = SnowflakeId.Default().NextId(); var receivedMessage = new CapReceivedMessage { diff --git a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj index 1b046d9..6ec8c25 100644 --- a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj +++ b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj @@ -8,15 +8,15 @@ - + all runtime; build; native; contentfiles; analyzers - - - - - + + + + +