diff --git a/build/version.props b/build/version.props index 4ba57d7..fae43bc 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 2 1 - 3 + 4 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 2affaa6..241870e 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -66,16 +66,18 @@ namespace DotNetCore.CAP.Kafka private void InitKafkaClient() { - _kafkaOptions.MainConfig["group.id"] = _groupId; + lock (_kafkaOptions) + { + _kafkaOptions.MainConfig["group.id"] = _groupId; - var config = _kafkaOptions.AsKafkaConfig(); - _consumerClient = new Consumer(config, null, StringDeserializer); - _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; - _consumerClient.OnMessage += ConsumerClient_OnMessage; - _consumerClient.OnError += ConsumerClient_OnError; + var config = _kafkaOptions.AsKafkaConfig(); + _consumerClient = new Consumer(config, null, StringDeserializer); + _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; + _consumerClient.OnMessage += ConsumerClient_OnMessage; + _consumerClient.OnError += ConsumerClient_OnError; + } } - private void ConsumerClient_OnConsumeError(object sender, Message e) { var message = e.Deserialize(null, StringDeserializer); diff --git a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs index 3cd77a3..390365b 100644 --- a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs @@ -5,6 +5,13 @@ namespace DotNetCore.CAP { public class EFOptions { + public const string DefaultSchema = "cap"; + + /// + /// Gets or sets the table name prefix to use when creating database objects. + /// + public string TableNamePrefix { get; set; } = DefaultSchema; + /// /// EF db context type. /// diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs index 3d29fa1..a0bf29d 100644 --- a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs @@ -25,22 +25,32 @@ namespace DotNetCore.CAP services.AddScoped(); services.AddTransient(); + AddSingletionMySqlOptions(services); + } + + private void AddSingletionMySqlOptions(IServiceCollection services) + { var mysqlOptions = new MySqlOptions(); + _configure(mysqlOptions); if (mysqlOptions.DbContextType != null) + { services.AddSingleton(x => { using (var scope = x.CreateScope()) { var provider = scope.ServiceProvider; - var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType); + var dbContext = (DbContext)provider.GetService(mysqlOptions.DbContextType); mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; return mysqlOptions; } }); + } else + { services.AddSingleton(mysqlOptions); + } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs index 5bec849..c75ae6a 100644 --- a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs @@ -1,5 +1,4 @@ // ReSharper disable once CheckNamespace - namespace DotNetCore.CAP { public class MySqlOptions : EFOptions @@ -8,7 +7,5 @@ namespace DotNetCore.CAP /// Gets or sets the database's connection string that will be used to store database entities. /// public string ConnectionString { get; set; } - - public string TableNamePrefix { get; set; } = "cap"; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs index 5f66f6c..1949c82 100644 --- a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs @@ -16,6 +16,7 @@ namespace Microsoft.Extensions.DependencyInjection { if (configure == null) throw new ArgumentNullException(nameof(configure)); + options.RegisterExtension(new MySqlCapOptionsExtension(configure)); return options; @@ -24,7 +25,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseEntityFramework(this CapOptions options) where TContext : DbContext { - return options.UseEntityFramework(opt => { opt.DbContextType = typeof(TContext); }); + return options.UseEntityFramework(opt => { }); } public static CapOptions UseEntityFramework(this CapOptions options, Action configure) @@ -32,10 +33,11 @@ namespace Microsoft.Extensions.DependencyInjection { if (configure == null) throw new ArgumentNullException(nameof(configure)); - var efOptions = new EFOptions {DbContextType = typeof(TContext)}; - configure(efOptions); - - options.RegisterExtension(new MySqlCapOptionsExtension(configure)); + options.RegisterExtension(new MySqlCapOptionsExtension(x => + { + configure(x); + x.DbContextType = typeof(TContext); + })); return options; } diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj index d64ff32..515828b 100644 --- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj +++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs index 1ed4e0e..7a59bde 100644 --- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -57,6 +57,7 @@ SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();"; using (var connection = new MySqlConnection(Options.ConnectionString)) { + connection.Open(); connection.Execute("SELECT LAST_INSERT_ID(0)"); return await connection.QueryFirstOrDefaultAsync(sql); } @@ -103,6 +104,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();"; using (var connection = new MySqlConnection(Options.ConnectionString)) { + connection.Open(); connection.Execute("SELECT LAST_INSERT_ID(0)"); return await connection.QueryFirstOrDefaultAsync(sql); } diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs index 9fde846..e001c05 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs @@ -24,7 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseEntityFramework(this CapOptions options) where TContext : DbContext { - return options.UseEntityFramework(opt => { opt.DbContextType = typeof(TContext); }); + return options.UseEntityFramework(opt => { }); } public static CapOptions UseEntityFramework(this CapOptions options, Action configure) @@ -32,10 +32,11 @@ namespace Microsoft.Extensions.DependencyInjection { if (configure == null) throw new ArgumentNullException(nameof(configure)); - var efOptions = new EFOptions {DbContextType = typeof(TContext)}; - configure(efOptions); - - options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure)); + options.RegisterExtension(new PostgreSqlCapOptionsExtension(x => + { + configure(x); + x.DbContextType = typeof(TContext); + })); return options; } diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs index e2d4a14..f1e94f8 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs @@ -25,10 +25,16 @@ namespace DotNetCore.CAP services.AddScoped(); services.AddTransient(); + AddSingletonPostgreSqlOptions(services); + } + + private void AddSingletonPostgreSqlOptions(IServiceCollection services) + { var postgreSqlOptions = new PostgreSqlOptions(); _configure(postgreSqlOptions); if (postgreSqlOptions.DbContextType != null) + { services.AddSingleton(x => { using (var scope = x.CreateScope()) @@ -39,8 +45,11 @@ namespace DotNetCore.CAP return postgreSqlOptions; } }); + } else + { services.AddSingleton(postgreSqlOptions); + } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj index 96dca04..702de21 100644 --- a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj +++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs index c977921..c882165 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs @@ -24,7 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseEntityFramework(this CapOptions options) where TContext : DbContext { - return options.UseEntityFramework(opt => { opt.DbContextType = typeof(TContext); }); + return options.UseEntityFramework(opt => { }); } public static CapOptions UseEntityFramework(this CapOptions options, Action configure) @@ -32,10 +32,11 @@ namespace Microsoft.Extensions.DependencyInjection { if (configure == null) throw new ArgumentNullException(nameof(configure)); - var efOptions = new EFOptions {DbContextType = typeof(TContext)}; - configure(efOptions); - - options.RegisterExtension(new SqlServerCapOptionsExtension(configure)); + options.RegisterExtension(new SqlServerCapOptionsExtension(x => + { + configure(x); + x.DbContextType = typeof(TContext); + })); return options; } diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs index ba17ff0..55408bd 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs @@ -24,6 +24,7 @@ namespace DotNetCore.CAP services.AddScoped(); services.AddScoped(); services.AddTransient(); + AddSqlServerOptions(services); } @@ -34,18 +35,22 @@ namespace DotNetCore.CAP _configure(sqlServerOptions); if (sqlServerOptions.DbContextType != null) + { services.AddSingleton(x => { using (var scope = x.CreateScope()) { var provider = scope.ServiceProvider; - var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType); + var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType); sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; return sqlServerOptions; } }); + } else + { services.AddSingleton(sqlServerOptions); + } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs index 57825b5..75d8398 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs @@ -54,7 +54,7 @@ namespace DotNetCore.CAP.SqlServer $@" IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}') BEGIN - EXEC('CREATE SCHEMA {schema}') + EXEC('CREATE SCHEMA [{schema}]') END; IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index e4fb3ea..697d6db 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -48,12 +48,12 @@ - + - + diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs index 7435b4b..d1ad6ea 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs @@ -36,7 +36,7 @@ namespace DotNetCore.CAP if (message == null) { - _logger.LogError($"Can not find mesage at cap received message table, message id:{fetched.MessageId} !!!"); + _logger.LogError($"Can not found the `message` at cap received message table, message id:{fetched.MessageId} !!!"); return OperateResult.Failed(); } @@ -68,6 +68,8 @@ namespace DotNetCore.CAP AddErrorReasonToContent(message, ex); + ++message.Retries; //issue: https://github.com/dotnetcore/CAP/issues/90 + await _stateChanger.ChangeStateAsync(message, new FailedState(), connection); fetched.RemoveFromQueue(); diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj index 5f22d0a..b0dd92a 100644 --- a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj +++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj @@ -15,13 +15,13 @@ - - + + - + diff --git a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj index 2c34952..be84f65 100644 --- a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj +++ b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj index 677adf2..035ed35 100644 --- a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj +++ b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj @@ -12,13 +12,13 @@ - + - + diff --git a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj index 66c7673..96bfb13 100644 --- a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj +++ b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj @@ -8,13 +8,13 @@ - + - +