From 14d0b34c78e957be02a40e2785db3a49f7da4ec4 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 21 Nov 2019 11:54:02 +0800 Subject: [PATCH] Introduced rewrite table name option for pgsql --- .../CAP.PostgreSqlCapOptionsExtension.cs | 6 +-- .../CAP.PostgreSqlOptions.cs | 15 +++---- .../IDataStorage.PostgreSql.cs | 29 ++++++++----- .../IMonitoringApi.PostgreSql.cs | 41 ++++++++++--------- .../IStorageInitializer.PostgreSql.cs | 20 ++++----- 5 files changed, 56 insertions(+), 55 deletions(-) diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs index 3be4e92..e1f6456 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs @@ -22,14 +22,12 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { services.AddSingleton(); + services.Configure(_configure); + services.AddSingleton, ConfigurePostgreSqlOptions>(); services.AddSingleton(); - services.AddSingleton(); services.AddTransient(); - - services.Configure(_configure); - services.AddSingleton, ConfigurePostgreSqlOptions>(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs index db55202..d881b2d 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -27,15 +27,12 @@ namespace DotNetCore.CAP public void Configure(PostgreSqlOptions options) { - if (options.DbContextType != null) - using (var scope = _serviceScopeFactory.CreateScope()) - { - var provider = scope.ServiceProvider; - using (var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType)) - { - options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; - } - } + if (options.DbContextType == null) return; + + using var scope = _serviceScopeFactory.CreateScope(); + var provider = scope.ServiceProvider; + using var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType); + options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index b632498..d4204e3 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -21,20 +21,27 @@ namespace DotNetCore.CAP.PostgreSql public class PostgreSqlDataStorage : IDataStorage { private readonly IOptions _capOptions; + private readonly IStorageInitializer _initializer; private readonly IOptions _options; + private readonly string _pubName; + private readonly string _recName; public PostgreSqlDataStorage( IOptions options, - IOptions capOptions) + IOptions capOptions, + IStorageInitializer initializer) { _capOptions = capOptions; + _initializer = initializer; _options = options; + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); } public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) { var sql = - $"UPDATE \"{_options.Value.Schema}\".\"published\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + $"UPDATE {_pubName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { @@ -48,7 +55,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) { var sql = - $"UPDATE \"{_options.Value.Schema}\".\"received\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + $"UPDATE {_recName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { @@ -63,7 +70,7 @@ namespace DotNetCore.CAP.PostgreSql CancellationToken cancellationToken = default) { var sql = - $"INSERT INTO \"{_options.Value.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"INSERT INTO {_pubName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + $"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var message = new MediumMessage @@ -108,7 +115,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) { var sql = - $"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); @@ -128,7 +135,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task StoreReceivedMessageAsync(string name, string group, Message message) { var sql = - $"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; var mdMessage = new MediumMessage @@ -161,15 +168,15 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); return await connection.ExecuteAsync( - $"DELETE FROM \"{_options.Value.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Value.Schema}\".\"{table}\" LIMIT @count);", - new {timeout, batchCount}); + $"DELETE FROM {table} WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @count);", + new { timeout, batchCount }); } public async Task> GetPublishedMessagesOfNeedRetry() { var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; var result = new List(); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); @@ -192,7 +199,7 @@ namespace DotNetCore.CAP.PostgreSql { var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + $"SELECT * FROM {_recName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; var result = new List(); @@ -214,7 +221,7 @@ namespace DotNetCore.CAP.PostgreSql public IMonitoringApi GetMonitoringApi() { - return new PostgreSqlMonitoringApi(_options); + return new PostgreSqlMonitoringApi(_options, _initializer); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index b026034..56e5619 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -19,16 +19,20 @@ namespace DotNetCore.CAP.PostgreSql public class PostgreSqlMonitoringApi : IMonitoringApi { private readonly IOptions _options; + private readonly string _pubName; + private readonly string _recName; - public PostgreSqlMonitoringApi(IOptions options) + public PostgreSqlMonitoringApi(IOptions options,IStorageInitializer initializer) { _options = options ?? throw new ArgumentNullException(nameof(options)); + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); } public async Task GetPublishedMessageAsync(long id) { var sql = - $"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; + $"SELECT * FROM {_pubName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); return await connection.QueryFirstOrDefaultAsync(sql); @@ -37,19 +41,18 @@ namespace DotNetCore.CAP.PostgreSql public async Task GetReceivedMessageAsync(long id) { var sql = - $"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; + $"SELECT * FROM {_recName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); return await connection.QueryFirstOrDefaultAsync(sql); } public StatisticsDto GetStatistics() { - var sql = string.Format(@" -select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeeded'; -select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded'; -select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed'; -select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';", - _options.Value.Schema); + var sql = $@" +select count(""Id"") from {_pubName} where ""StatusName"" = N'Succeeded'; +select count(""Id"") from {_recName} where ""StatusName"" = N'Succeeded'; +select count(""Id"") from {_pubName} where ""StatusName"" = N'Failed'; +select count(""Id"") from {_recName} where ""StatusName"" = N'Failed';"; var statistics = UseConnection(connection => { @@ -70,7 +73,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' public IList Messages(MessageQueryDto queryDto) { - var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received"; + var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName; var where = string.Empty; if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)"; @@ -82,7 +85,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike '%@Content%'"; var sqlQuery = - $"select * from \"{_options.Value.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; + $"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; return UseConnection(conn => conn.Query(sqlQuery, new { @@ -97,34 +100,34 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' public int PublishedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Failed))); + return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed))); } public int PublishedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded))); + return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded))); } public int ReceivedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed))); + return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed))); } public int ReceivedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded))); + return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded))); } public IDictionary HourlySucceededJobs(MessageType type) { - var tableName = type == MessageType.Publish ? "published" : "received"; + var tableName = type == MessageType.Publish ? _pubName : _recName; return UseConnection(connection => GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded))); } public IDictionary HourlyFailedJobs(MessageType type) { - var tableName = type == MessageType.Publish ? "published" : "received"; + var tableName = type == MessageType.Publish ? _pubName : _recName; return UseConnection(connection => GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed))); } @@ -132,7 +135,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) { var sqlQuery = - $"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; + $"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)"; var count = connection.ExecuteScalar(sqlQuery, new { state = statusName }); return count; @@ -170,7 +173,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' with aggr as ( select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"", count(""Id"") as ""Count"" - from ""{_options.Value.Schema}"".""{tableName}"" + from {tableName} where ""StatusName"" = @statusName group by to_char(""Added"", 'yyyy-MM-dd-HH') ) diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index 69af3fa..ad494c9 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -24,14 +24,14 @@ namespace DotNetCore.CAP.PostgreSql _logger = logger; } - public string GetPublishedTableName() + public virtual string GetPublishedTableName() { - return $"{_options.Value.Schema}.published"; + return $"\"{_options.Value.Schema}\".\"published\""; } - public string GetReceivedTableName() + public virtual string GetReceivedTableName() { - return $"{_options.Value.Schema}.received"; + return $"\"{_options.Value.Schema}\".\"received\""; } public async Task InitializeAsync(CancellationToken cancellationToken) @@ -39,7 +39,7 @@ namespace DotNetCore.CAP.PostgreSql if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(_options.Value.Schema); - using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) + await using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) { await connection.ExecuteAsync(sql); } @@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql var batchSql = $@" CREATE SCHEMA IF NOT EXISTS ""{schema}""; -CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( +CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}( ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL, @@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( ""StatusName"" VARCHAR(50) NOT NULL ); -CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( +CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}( ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL, @@ -74,11 +74,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( ""Added"" TIMESTAMP NOT NULL, ""ExpiresAt"" TIMESTAMP NULL, ""StatusName"" VARCHAR(50) NOT NULL -); - -ALTER TABLE ""{schema}"".""received"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL; -ALTER TABLE ""{schema}"".""published"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL; -"; +);"; return batchSql; } }