Browse Source

Introduced rewrite table name option for pgsql

master
Savorboard 5 years ago
parent
commit
14d0b34c78
5 changed files with 56 additions and 55 deletions
  1. +2
    -4
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
  2. +6
    -9
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
  3. +18
    -11
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  4. +22
    -19
      src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
  5. +8
    -12
      src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs

+ 2
- 4
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs View File

@@ -22,14 +22,12 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapStorageMarkerService>();
services.Configure(_configure);
services.AddSingleton<IConfigureOptions<PostgreSqlOptions>, ConfigurePostgreSqlOptions>();

services.AddSingleton<IDataStorage, PostgreSqlDataStorage>();

services.AddSingleton<IStorageInitializer, PostgreSqlStorageInitializer>();
services.AddTransient<CapTransactionBase, PostgreSqlCapTransaction>();

services.Configure(_configure);
services.AddSingleton<IConfigureOptions<PostgreSqlOptions>, ConfigurePostgreSqlOptions>();
}
}
}

+ 6
- 9
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs View File

@@ -27,15 +27,12 @@ namespace DotNetCore.CAP

public void Configure(PostgreSqlOptions options)
{
if (options.DbContextType != null)
using (var scope = _serviceScopeFactory.CreateScope())
{
var provider = scope.ServiceProvider;
using (var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType))
{
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
if (options.DbContextType == null) return;
using var scope = _serviceScopeFactory.CreateScope();
var provider = scope.ServiceProvider;
using var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType);
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}

+ 18
- 11
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs View File

@@ -21,20 +21,27 @@ namespace DotNetCore.CAP.PostgreSql
public class PostgreSqlDataStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer;
private readonly IOptions<PostgreSqlOptions> _options;
private readonly string _pubName;
private readonly string _recName;

public PostgreSqlDataStorage(
IOptions<PostgreSqlOptions> options,
IOptions<CapOptions> capOptions)
IOptions<CapOptions> capOptions,
IStorageInitializer initializer)
{
_capOptions = capOptions;
_initializer = initializer;
_options = options;
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}

public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE \"{_options.Value.Schema}\".\"published\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
$"UPDATE {_pubName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
@@ -48,7 +55,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE \"{_options.Value.Schema}\".\"received\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
$"UPDATE {_recName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
@@ -63,7 +70,7 @@ namespace DotNetCore.CAP.PostgreSql
CancellationToken cancellationToken = default)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"INSERT INTO {_pubName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

var message = new MediumMessage
@@ -108,7 +115,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
@@ -128,7 +135,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql =
$"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

var mdMessage = new MediumMessage
@@ -161,15 +168,15 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);

return await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Value.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Value.Schema}\".\"{table}\" LIMIT @count);",
new {timeout, batchCount});
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @count);",
new { timeout, batchCount });
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
$"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
@@ -192,7 +199,7 @@ namespace DotNetCore.CAP.PostgreSql
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
$"SELECT * FROM {_recName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();

@@ -214,7 +221,7 @@ namespace DotNetCore.CAP.PostgreSql

public IMonitoringApi GetMonitoringApi()
{
return new PostgreSqlMonitoringApi(_options);
return new PostgreSqlMonitoringApi(_options, _initializer);
}
}
}

+ 22
- 19
src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs View File

@@ -19,16 +19,20 @@ namespace DotNetCore.CAP.PostgreSql
public class PostgreSqlMonitoringApi : IMonitoringApi
{
private readonly IOptions<PostgreSqlOptions> _options;
private readonly string _pubName;
private readonly string _recName;

public PostgreSqlMonitoringApi(IOptions<PostgreSqlOptions> options)
public PostgreSqlMonitoringApi(IOptions<PostgreSqlOptions> options,IStorageInitializer initializer)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}

public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
$"SELECT * FROM {_pubName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
@@ -37,19 +41,18 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql =
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
$"SELECT * FROM {_recName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}

public StatisticsDto GetStatistics()
{
var sql = string.Format(@"
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';",
_options.Value.Schema);
var sql = $@"
select count(""Id"") from {_pubName} where ""StatusName"" = N'Succeeded';
select count(""Id"") from {_recName} where ""StatusName"" = N'Succeeded';
select count(""Id"") from {_pubName} where ""StatusName"" = N'Failed';
select count(""Id"") from {_recName} where ""StatusName"" = N'Failed';";

var statistics = UseConnection(connection =>
{
@@ -70,7 +73,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'

public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName;
var where = string.Empty;

if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)";
@@ -82,7 +85,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike '%@Content%'";

var sqlQuery =
$"select * from \"{_options.Value.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
$"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";

return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
@@ -97,34 +100,34 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'

public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
}

public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
}

public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
}

public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded)));
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
}
@@ -132,7 +135,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery =
$"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
$"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)";

var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
@@ -170,7 +173,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
with aggr as (
select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"",
count(""Id"") as ""Count""
from ""{_options.Value.Schema}"".""{tableName}""
from {tableName}
where ""StatusName"" = @statusName
group by to_char(""Added"", 'yyyy-MM-dd-HH')
)


+ 8
- 12
src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs View File

@@ -24,14 +24,14 @@ namespace DotNetCore.CAP.PostgreSql
_logger = logger;
}

public string GetPublishedTableName()
public virtual string GetPublishedTableName()
{
return $"{_options.Value.Schema}.published";
return $"\"{_options.Value.Schema}\".\"published\"";
}

public string GetReceivedTableName()
public virtual string GetReceivedTableName()
{
return $"{_options.Value.Schema}.received";
return $"\"{_options.Value.Schema}\".\"received\"";
}

public async Task InitializeAsync(CancellationToken cancellationToken)
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.PostgreSql
if (cancellationToken.IsCancellationRequested) return;

var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
await using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";

CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}(
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
@@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""StatusName"" VARCHAR(50) NOT NULL
);

CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}(
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
@@ -74,11 +74,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);

ALTER TABLE ""{schema}"".""received"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL;
ALTER TABLE ""{schema}"".""published"" ADD COLUMN IF NOT EXISTS ""Version"" VARCHAR(20) NOT NULL;
";
);";
return batchSql;
}
}

Loading…
Cancel
Save