diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs index 0f41f1c..8bc7a19 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs @@ -17,10 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UsePostgreSql(this CapOptions options, Action configure) { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + if (configure == null) throw new ArgumentNullException(nameof(configure)); configure += x => x.Version = options.Version; @@ -38,10 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseEntityFramework(this CapOptions options, Action configure) where TContext : DbContext { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + if (configure == null) throw new ArgumentNullException(nameof(configure)); options.RegisterExtension(new PostgreSqlCapOptionsExtension(x => { diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs index a3abe19..3be4e92 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs @@ -2,8 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using DotNetCore.CAP.Persistence; using DotNetCore.CAP.PostgreSql; -using DotNetCore.CAP.Processor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -22,12 +22,10 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(provider => (PostgreSqlPublisher)provider.GetService()); - services.AddSingleton(); + services.AddSingleton(); + + services.AddSingleton(); services.AddTransient(); services.Configure(_configure); diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs index ba30375..db55202 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -28,16 +28,14 @@ namespace DotNetCore.CAP public void Configure(PostgreSqlOptions options) { if (options.DbContextType != null) - { using (var scope = _serviceScopeFactory.CreateScope()) { var provider = scope.ServiceProvider; - using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType)) + using (var dbContext = (DbContext) provider.GetRequiredService(options.DbContextType)) { options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; } } - } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj index fa83a1b..84fdeeb 100644 --- a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj +++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj @@ -1,7 +1,7 @@  - netstandard2.0 + netstandard2.1 DotNetCore.CAP.PostgreSql $(PackageTags);PostgreSQL @@ -9,13 +9,14 @@ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml 1701;1702;1705;CS1591 + 8 - - - - + + + + diff --git a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs deleted file mode 100644 index 2631f7f..0000000 --- a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Threading; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Messages; -using Microsoft.EntityFrameworkCore.Storage; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Npgsql; - -namespace DotNetCore.CAP.PostgreSql -{ - public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher - { - private readonly PostgreSqlOptions _options; - - public PostgreSqlPublisher(IServiceProvider provider) : base(provider) - { - _options = provider.GetService>().Value; - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - await PublishAsyncInternal(message); - } - - protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null, - CancellationToken cancel = default(CancellationToken)) - { - if (transaction == null) - { - using (var connection = InitDbConnection()) - { - await connection.ExecuteAsync(PrepareSql(), message); - return; - } - } - - var dbTrans = transaction.DbTransaction as IDbTransaction; - if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans) - { - dbTrans = dbContextTrans.GetDbTransaction(); - } - - var conn = dbTrans?.Connection; - await conn.ExecuteAsync(PrepareSql(), message, dbTrans); - } - - #region private methods - - private string PrepareSql() - { - return - $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - } - - private IDbConnection InitDbConnection() - { - var conn = new NpgsqlConnection(_options.ConnectionString); - conn.Open(); - return conn; - } - - #endregion private methods - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs index e185cbe..012578c 100644 --- a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs @@ -3,6 +3,8 @@ using System.Data; using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.DependencyInjection; @@ -33,6 +35,23 @@ namespace DotNetCore.CAP Flush(); } + public override async Task CommitAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.CommitAsync(cancellationToken); + break; + } + + Flush(); + } + public override void Rollback() { Debug.Assert(DbTransaction != null); @@ -48,6 +67,21 @@ namespace DotNetCore.CAP } } + public override async Task RollbackAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.RollbackAsync(cancellationToken); + break; + } + } + public override void Dispose() { (DbTransaction as IDbTransaction)?.Dispose(); @@ -85,10 +119,7 @@ namespace DotNetCore.CAP public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false) { - if (dbConnection.State == ConnectionState.Closed) - { - dbConnection.Open(); - } + if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); var dbTransaction = dbConnection.BeginTransaction(); publisher.Transaction.Value = publisher.ServiceProvider.GetService(); diff --git a/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs deleted file mode 100644 index 8235b57..0000000 --- a/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Processor; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Npgsql; - -namespace DotNetCore.CAP.PostgreSql -{ - internal class PostgreSqlCollectProcessor : ICollectProcessor - { - private const int MaxBatch = 1000; - - private static readonly string[] Tables = - { - "published", "received" - }; - - private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); - private readonly ILogger _logger; - private readonly PostgreSqlOptions _options; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - - public PostgreSqlCollectProcessor(ILogger logger, - IOptions sqlServerOptions) - { - _logger = logger; - _options = sqlServerOptions.Value; - } - - public async Task ProcessAsync(ProcessingContext context) - { - foreach (var table in Tables) - { - _logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}]."); - - var removedCount = 0; - do - { - using (var connection = new NpgsqlConnection(_options.ConnectionString)) - { - removedCount = await connection.ExecuteAsync( - $"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);", - new { now = DateTime.Now, count = MaxBatch }); - } - - if (removedCount != 0) - { - await context.WaitAsync(_delay); - context.ThrowIfStopping(); - } - } while (removedCount != 0); - } - - await context.WaitAsync(_waitingInterval); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs new file mode 100644 index 0000000..d0cc568 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -0,0 +1,220 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Serialization; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Options; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlDataStorage : IDataStorage + { + private readonly IOptions _capOptions; + private readonly IOptions _options; + + public PostgreSqlDataStorage( + IOptions options, + IOptions capOptions) + { + _capOptions = capOptions; + _options = options; + } + + public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) + { + var sql = + $"UPDATE \"{_options.Value.Schema}\".\"published\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); + } + + public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) + { + var sql = + $"UPDATE \"{_options.Value.Schema}\".\"received\" SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); + } + + public async Task StoreMessageAsync(string name, Message content, object dbTransaction = null, + CancellationToken cancellationToken = default) + { + var sql = + $"INSERT INTO \"{_options.Value.Schema}\".\"published\" (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + var message = new MediumMessage + { + DbId = content.GetId(), + Origin = content, + Content = StringSerializer.Serialize(content), + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + var po = new + { + Id = message.DbId, + Name = name, + message.Content, + message.Retries, + message.Added, + message.ExpiresAt, + StatusName = StatusName.Scheduled + }; + + if (dbTransaction == null) + { + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, po); + } + else + { + var dbTrans = dbTransaction as IDbTransaction; + if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) + dbTrans = dbContextTrans.GetDbTransaction(); + + var conn = dbTrans?.Connection; + await conn.ExecuteAsync(sql, po, dbTrans); + } + + return message; + } + + public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) + { + var sql = + $"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; + + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = SnowflakeId.Default().NextId().ToString(), + Group = group, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = nameof(StatusName.Failed) + }); + } + + public async Task StoreReceivedMessageAsync(string name, string group, Message message) + { + var sql = + $"INSERT INTO \"{_options.Value.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; + + var mdMessage = new MediumMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Origin = message, + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + var content = StringSerializer.Serialize(mdMessage.Origin); + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = mdMessage.DbId, + Group = group, + Name = name, + Content = content, + mdMessage.Retries, + mdMessage.Added, + mdMessage.ExpiresAt, + StatusName = nameof(StatusName.Scheduled) + }); + return mdMessage; + } + + public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, + CancellationToken token = default) + { + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + + return await connection.ExecuteAsync( + $"DELETE FROM \"{_options.Value.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Value.Schema}\".\"{table}\" LIMIT @count);", + new {timeout, batchCount}); + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var sql = + $"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + + var result = new List(); + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); + } + + return result; + } + + public async Task> GetReceivedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var sql = + $"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + + var result = new List(); + + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); + } + + return result; + } + + public IMonitoringApi GetMonitoringApi() + { + return new PostgreSqlMonitoringApi(_options); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs b/src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs index 980dc57..ac5f87d 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Threading; +using System.Threading.Tasks; using DotNetCore.CAP; // ReSharper disable once CheckNamespace @@ -33,6 +35,21 @@ namespace Microsoft.EntityFrameworkCore.Storage _transaction.Rollback(); } + public Task CommitAsync(CancellationToken cancellationToken = default) + { + return _transaction.CommitAsync(cancellationToken); + } + + public Task RollbackAsync(CancellationToken cancellationToken = default) + { + return _transaction.CommitAsync(cancellationToken); + } + public Guid TransactionId { get; } + + public ValueTask DisposeAsync() + { + return new ValueTask(Task.Run(() => _transaction.Dispose())); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index a2c3bcb..8fde3f5 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -5,24 +5,45 @@ using System; using System.Collections.Generic; using System.Data; using System.Linq; +using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Dashboard; -using DotNetCore.CAP.Dashboard.Monitoring; -using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; using Microsoft.Extensions.Options; +using Npgsql; namespace DotNetCore.CAP.PostgreSql { public class PostgreSqlMonitoringApi : IMonitoringApi { - private readonly PostgreSqlOptions _options; - private readonly PostgreSqlStorage _storage; + private readonly IOptions _options; - public PostgreSqlMonitoringApi(IStorage storage, IOptions options) + public PostgreSqlMonitoringApi(IOptions options) { - _options = options.Value ?? throw new ArgumentNullException(nameof(options)); - _storage = storage as PostgreSqlStorage ?? throw new ArgumentNullException(nameof(storage)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task GetPublishedMessageAsync(long id) + { + var sql = + $"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; + + using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task GetReceivedMessageAsync(long id) + { + var sql = + $"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; + using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } } public StatisticsDto GetStatistics() @@ -32,7 +53,7 @@ select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Succeed select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Succeeded'; select count(""Id"") from ""{0}"".""published"" where ""StatusName"" = N'Failed'; select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed';", - _options.Schema); + _options.Value.Schema); var statistics = UseConnection(connection => { @@ -56,28 +77,16 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received"; var where = string.Empty; - if (!string.IsNullOrEmpty(queryDto.StatusName)) - { - where += " and Lower(\"StatusName\") = Lower(@StatusName)"; - } + if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)"; - if (!string.IsNullOrEmpty(queryDto.Name)) - { - where += " and Lower(\"Name\") = Lower(@Name)"; - } + if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Lower(\"Name\") = Lower(@Name)"; - if (!string.IsNullOrEmpty(queryDto.Group)) - { - where += " and Lower(\"Group\") = Lower(@Group)"; - } + if (!string.IsNullOrEmpty(queryDto.Group)) where += " and Lower(\"Group\") = Lower(@Group)"; - if (!string.IsNullOrEmpty(queryDto.Content)) - { - where += " and \"Content\" ILike '%@Content%'"; - } + if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike '%@Content%'"; var sqlQuery = - $"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; + $"select * from \"{_options.Value.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; return UseConnection(conn => conn.Query(sqlQuery, new { @@ -92,42 +101,42 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' public int PublishedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed)); + return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Failed))); } public int PublishedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded)); + return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded))); } public int ReceivedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed)); + return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed))); } public int ReceivedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded)); + return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded))); } public IDictionary HourlySucceededJobs(MessageType type) { var tableName = type == MessageType.Publish ? "published" : "received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded)); + GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded))); } public IDictionary HourlyFailedJobs(MessageType type) { var tableName = type == MessageType.Publish ? "published" : "received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, StatusName.Failed)); + GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed))); } private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) { var sqlQuery = - $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; + $"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; var count = connection.ExecuteScalar(sqlQuery, new {state = statusName}); return count; @@ -135,7 +144,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' private T UseConnection(Func action) { - return _storage.UseConnection(action); + return action(new NpgsqlConnection(_options.Value.ConnectionString)); } private Dictionary GetHourlyTimelineStats(IDbConnection connection, string tableName, @@ -165,7 +174,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' with aggr as ( select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"", count(""Id"") as ""Count"" - from ""{_options.Schema}"".""{tableName}"" + from ""{_options.Value.Schema}"".""{tableName}"" where ""StatusName"" = @statusName group by to_char(""Added"", 'yyyy-MM-dd-HH') ) @@ -178,9 +187,7 @@ select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; foreach (var key in keyMaps.Keys) { if (!valuesMap.ContainsKey(key)) - { valuesMap.Add(key, 0); - } } var result = new Dictionary(); @@ -193,4 +200,10 @@ select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; return result; } } + + internal class TimelineCounter + { + public string Key { get; set; } + public int Count { get; set; } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs deleted file mode 100644 index f2c0b5a..0000000 --- a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.Options; -using Npgsql; - -namespace DotNetCore.CAP.PostgreSql -{ - public class PostgreSqlStorageConnection : IStorageConnection - { - private readonly CapOptions _capOptions; - - public PostgreSqlStorageConnection( - IOptions options, - IOptions capOptions) - { - _capOptions = capOptions.Value; - Options = options.Value; - } - - public PostgreSqlOptions Options { get; } - - public IStorageTransaction CreateTransaction() - { - return new PostgreSqlStorageTransaction(this); - } - - public async Task GetPublishedMessageAsync(long id) - { - var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; - - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return await connection.QueryFirstOrDefaultAsync(sql); - } - } - - public async Task> GetPublishedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); - var sql = - $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; - - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return await connection.QueryAsync(sql); - } - } - - public void StoreReceivedMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = - $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; - - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - connection.Execute(sql, message); - } - } - - public async Task GetReceivedMessageAsync(long id) - { - var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return await connection.QueryFirstOrDefaultAsync(sql); - } - } - - public async Task> GetReceivedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); - var sql = - $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Version\"='{_capOptions.Version}' AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return await connection.QueryAsync(sql); - } - } - - public bool ChangePublishedState(long messageId, string state) - { - var sql = - $"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}"; - - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return connection.Execute(sql) > 0; - } - } - - public bool ChangeReceivedState(long messageId, string state) - { - var sql = - $"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}"; - - using (var connection = new NpgsqlConnection(Options.ConnectionString)) - { - return connection.Execute(sql) > 0; - } - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs similarity index 53% rename from src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs rename to src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index c13d18a..69af3fa 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -1,53 +1,44 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; -using System.Data; using System.Threading; using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.Persistence; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; namespace DotNetCore.CAP.PostgreSql { - public class PostgreSqlStorage : IStorage + public class PostgreSqlStorageInitializer : IStorageInitializer { - private readonly IOptions _capOptions; - private readonly IDbConnection _existingConnection = null; private readonly ILogger _logger; private readonly IOptions _options; - public PostgreSqlStorage(ILogger logger, - IOptions capOptions, + public PostgreSqlStorageInitializer( + ILogger logger, IOptions options) { _options = options; _logger = logger; - _capOptions = capOptions; } - public IStorageConnection GetConnection() + public string GetPublishedTableName() { - return new PostgreSqlStorageConnection(_options, _capOptions); + return $"{_options.Value.Schema}.published"; } - public IMonitoringApi GetMonitoringApi() + public string GetReceivedTableName() { - return new PostgreSqlMonitoringApi(this, _options); + return $"{_options.Value.Schema}.received"; } public async Task InitializeAsync(CancellationToken cancellationToken) { - if (cancellationToken.IsCancellationRequested) - { - return; - } + if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(_options.Value.Schema); - using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) { await connection.ExecuteAsync(sql); @@ -56,45 +47,6 @@ namespace DotNetCore.CAP.PostgreSql _logger.LogDebug("Ensuring all create database tables script are applied."); } - internal T UseConnection(Func func) - { - IDbConnection connection = null; - - try - { - connection = CreateAndOpenConnection(); - return func(connection); - } - finally - { - ReleaseConnection(connection); - } - } - - internal IDbConnection CreateAndOpenConnection() - { - var connection = _existingConnection ?? new NpgsqlConnection(_options.Value.ConnectionString); - - if (connection.State == ConnectionState.Closed) - { - connection.Open(); - } - - return connection; - } - - internal bool IsExistingConnection(IDbConnection connection) - { - return connection != null && ReferenceEquals(connection, _existingConnection); - } - - internal void ReleaseConnection(IDbConnection connection) - { - if (connection != null && !IsExistingConnection(connection)) - { - connection.Dispose(); - } - } protected virtual string CreateDbTablesScript(string schema) { diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs deleted file mode 100644 index 44750eb..0000000 --- a/src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Messages; -using Npgsql; - -namespace DotNetCore.CAP.PostgreSql -{ - public class PostgreSqlStorageTransaction : IStorageTransaction - { - private readonly IDbConnection _dbConnection; - - private readonly IDbTransaction _dbTransaction; - private readonly string _schema; - - public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection) - { - var options = connection.Options; - _schema = options.Schema; - - _dbConnection = new NpgsqlConnection(options.ConnectionString); - _dbConnection.Open(); - _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - } - - public void UpdateMessage(CapPublishedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = - $@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; - _dbConnection.Execute(sql, message, _dbTransaction); - } - - public void UpdateMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = - $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; - _dbConnection.Execute(sql, message, _dbTransaction); - } - - public Task CommitAsync() - { - _dbTransaction.Commit(); - return Task.CompletedTask; - } - - public void Dispose() - { - _dbTransaction.Dispose(); - _dbConnection.Dispose(); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs index 169835f..6f86746 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using DotNetCore.CAP.Internal; using DotNetCore.CAP.RabbitMQ; using DotNetCore.CAP.Transport; using Microsoft.Extensions.DependencyInjection; @@ -27,7 +26,6 @@ namespace DotNetCore.CAP services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); - } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index db13dac..ba65bd4 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Text; using System.Threading; using DotNetCore.CAP.Messages;