diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs index edc1729..dc08d52 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs @@ -24,9 +24,12 @@ namespace DotNetCore.CAP services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); - services.AddScoped(); - services.AddScoped(); + + services.AddScoped(); + services.AddScoped(); + services.AddTransient(); + services.AddTransient(); AddSingletonPostgreSqlOptions(services); } diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs index 9f52154..8932f3d 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +// ReSharper disable once CheckNamespace namespace DotNetCore.CAP { public class PostgreSqlOptions : EFOptions diff --git a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs deleted file mode 100644 index e299dc4..0000000 --- a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Models; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Storage; -using Microsoft.Extensions.Logging; -using Npgsql; - -namespace DotNetCore.CAP.PostgreSql -{ - public class CapPublisher : CapPublisherBase, ICallbackPublisher - { - private readonly DbContext _dbContext; - private readonly PostgreSqlOptions _options; - - public CapPublisher(ILogger logger, IDispatcher dispatcher, - IServiceProvider provider, PostgreSqlOptions options) - : base(logger, dispatcher) - { - ServiceProvider = provider; - _options = options; - - if (_options.DbContextType != null) - { - IsUsingEF = true; - _dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType); - } - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - using (var conn = new NpgsqlConnection(_options.ConnectionString)) - { - var id = await conn.ExecuteScalarAsync(PrepareSql(), message); - message.Id = id; - Enqueue(message); - } - } - - protected override void PrepareConnectionForEF() - { - DbConnection = _dbContext.Database.GetDbConnection(); - var dbContextTransaction = _dbContext.Database.CurrentTransaction; - var dbTrans = dbContextTransaction?.GetDbTransaction(); - //DbTransaction is dispose in original - if (dbTrans?.Connection == null) - { - IsCapOpenedTrans = true; - dbContextTransaction?.Dispose(); - dbContextTransaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); - dbTrans = dbContextTransaction.GetDbTransaction(); - } - - DbTransaction = dbTrans; - } - - protected override int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) - { - return dbConnection.ExecuteScalar(PrepareSql(), message, dbTransaction); - } - - protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message) - { - return dbConnection.ExecuteScalarAsync(PrepareSql(), message, dbTransaction); - } - - #region private methods - - private string PrepareSql() - { - return - $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; - } - - #endregion private methods - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs new file mode 100644 index 0000000..7222efd --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs @@ -0,0 +1,93 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Data; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Models; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlPublisher : CapPublisherBase, ICallbackPublisher + { + private readonly DbContext _dbContext; + private readonly PostgreSqlOptions _options; + private readonly bool _isUsingEF; + + private NpgsqlConnection _connection; + + public PostgreSqlPublisher(IServiceProvider provider, PostgreSqlOptions options): base(provider) + { + _options = options; + + if (_options.DbContextType == null) + { + return; + } + + _isUsingEF = true; + _dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); + } + + public async Task PublishCallbackAsync(CapPublishedMessage message) + { + await PublishAsyncInternal(message); + } + + protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, + CancellationToken cancel = default(CancellationToken)) + { + var dbTrans = transaction.DbTransaction as IDbTransaction; + if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans) + { + dbTrans = dbContextTrans.GetDbTransaction(); + } + var conn = dbTrans?.Connection; + return conn.ExecuteAsync(PrepareSql(), message, dbTrans); + } + + protected override object GetDbTransaction() + { + if (_isUsingEF) + { + var dbContextTransaction = _dbContext.Database.CurrentTransaction; + if (dbContextTransaction == null) + { + return InitDbConnection(); + } + + return dbContextTransaction; + } + + return InitDbConnection(); + } + + #region private methods + + private string PrepareSql() + { + return + $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Id\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + } + + private IDbTransaction InitDbConnection() + { + _connection = new NpgsqlConnection(_options.ConnectionString); + _connection.Open(); + return _connection.BeginTransaction(IsolationLevel.ReadCommitted); + } + #endregion private methods + + public void Dispose() + { + _dbContext?.Dispose(); + _connection?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs new file mode 100644 index 0000000..7599e39 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs @@ -0,0 +1,72 @@ +using System.Data; +using System.Diagnostics; +using Microsoft.EntityFrameworkCore.Storage; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class PostgreSqlCapTransaction : CapTransactionBase + { + public PostgreSqlCapTransaction(IDispatcher dispatcher) : base(dispatcher) { } + + public override void Commit() + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + dbContextTransaction.Commit(); + break; + } + + Flush(); + } + + public override void Rollback() + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + dbContextTransaction.Rollback(); + break; + } + } + + public override void Dispose() + { + (DbTransaction as IDbTransaction)?.Dispose(); + } + } + + public static class CapTransactionExtensions + { + public static ICapTransaction Begin(this ICapTransaction transaction, + IDbTransaction dbTransaction, bool autoCommit = false) + { + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + + public static ICapTransaction Begin(this ICapTransaction transaction, + IDbContextTransaction dbTransaction, bool autoCommit = false) + { + + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + } + +} diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs similarity index 100% rename from src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs rename to src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs similarity index 96% rename from src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs rename to src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs index 34c8038..baa08e7 100644 --- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs @@ -100,10 +100,8 @@ namespace DotNetCore.CAP.PostgreSql var batchSql = $@" CREATE SCHEMA IF NOT EXISTS ""{schema}""; -DROP TABLE IF EXISTS ""{schema}"".""queue""; - CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( - ""Id"" SERIAL PRIMARY KEY NOT NULL, + ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Name"" VARCHAR(200) NOT NULL, ""Group"" VARCHAR(200) NULL, ""Content"" TEXT NULL, @@ -114,7 +112,7 @@ CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( ); CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( - ""Id"" SERIAL PRIMARY KEY NOT NULL, + ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Name"" VARCHAR(200) NOT NULL, ""Content"" TEXT NULL, ""Retries"" INT NOT NULL, diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs similarity index 89% rename from src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs rename to src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs index 3d8fba7..c5c8146 100644 --- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs @@ -28,7 +28,7 @@ namespace DotNetCore.CAP.PostgreSql return new PostgreSqlStorageTransaction(this); } - public async Task GetPublishedMessageAsync(int id) + public async Task GetPublishedMessageAsync(long id) { var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; @@ -50,7 +50,7 @@ namespace DotNetCore.CAP.PostgreSql } } - public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + public void StoreReceivedMessage(CapReceivedMessage message) { if (message == null) { @@ -58,15 +58,15 @@ namespace DotNetCore.CAP.PostgreSql } var sql = - $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; + $"INSERT INTO \"{Options.Schema}\".\"received\"(\"Id\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) { - return await connection.ExecuteScalarAsync(sql, message); + connection.Execute(sql, message); } } - public async Task GetReceivedMessageAsync(int id) + public async Task GetReceivedMessageAsync(long id) { var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) @@ -90,7 +90,7 @@ namespace DotNetCore.CAP.PostgreSql { } - public bool ChangePublishedState(int messageId, string state) + public bool ChangePublishedState(long messageId, string state) { var sql = $"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}"; @@ -101,7 +101,7 @@ namespace DotNetCore.CAP.PostgreSql } } - public bool ChangeReceivedState(int messageId, string state) + public bool ChangeReceivedState(long messageId, string state) { var sql = $"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}"; diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs b/src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs similarity index 64% rename from src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs rename to src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs index d64c46a..64dd30c 100644 --- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageTransaction.PostgreSql.cs @@ -35,9 +35,7 @@ namespace DotNetCore.CAP.PostgreSql } var sql = - $@"UPDATE ""{ - _schema - }"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; + $@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; _dbConnection.Execute(sql, message, _dbTransaction); } @@ -66,29 +64,5 @@ namespace DotNetCore.CAP.PostgreSql _dbTransaction.Dispose(); _dbConnection.Dispose(); } - - public void EnqueueMessage(CapPublishedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);"; - _dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish}, - _dbTransaction); - } - - public void EnqueueMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);"; - _dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe}, - _dbTransaction); - } } } \ No newline at end of file diff --git a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs index b1cb25c..98734ba 100644 --- a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageConnectionTest.cs @@ -22,17 +22,18 @@ namespace DotNetCore.CAP.PostgreSql.Test [Fact] public async Task GetPublishedMessageAsync_Test() { - var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";"; + var sql = @"INSERT INTO ""cap"".""published""(""Id"",""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Id,@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var insertedId = SnowflakeId.Default().NextId(); var publishMessage = new CapPublishedMessage { + Id = insertedId, Name = "PostgreSqlStorageConnectionTest", Content = "", StatusName = StatusName.Scheduled }; - var insertedId = default(int); using (var connection = ConnectionUtil.CreateConnection()) { - insertedId = connection.QueryFirst(sql, publishMessage); + await connection.ExecuteAsync(sql, publishMessage); } var message = await _storage.GetPublishedMessageAsync(insertedId); Assert.NotNull(message); @@ -41,10 +42,11 @@ namespace DotNetCore.CAP.PostgreSql.Test } [Fact] - public async Task StoreReceivedMessageAsync_Test() + public void StoreReceivedMessageAsync_Test() { var receivedMessage = new CapReceivedMessage { + Id = SnowflakeId.Default().NextId(), Name = "PostgreSqlStorageConnectionTest", Content = "", Group = "mygroup", @@ -54,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql.Test Exception exception = null; try { - await _storage.StoreReceivedMessageAsync(receivedMessage); + _storage.StoreReceivedMessage(receivedMessage); } catch (Exception ex) { @@ -67,19 +69,21 @@ namespace DotNetCore.CAP.PostgreSql.Test public async Task GetReceivedMessageAsync_Test() { var sql = $@" - INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") - VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";"; + INSERT INTO ""cap"".""received""(""Id"",""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") + VALUES(@Id,@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + var insertedId = SnowflakeId.Default().NextId(); var receivedMessage = new CapReceivedMessage { + Id= insertedId, Name = "PostgreSqlStorageConnectionTest", Content = "", Group = "mygroup", StatusName = StatusName.Scheduled }; - var insertedId = default(int); + using (var connection = ConnectionUtil.CreateConnection()) { - insertedId = connection.QueryFirst(sql, receivedMessage); + await connection.ExecuteAsync(sql, receivedMessage); } var message = await _storage.GetReceivedMessageAsync(insertedId); diff --git a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageTest.cs b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageTest.cs index ec0bc2b..4bf1d77 100644 --- a/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageTest.cs +++ b/test/DotNetCore.CAP.PostgreSql.Test/PostgreSqlStorageTest.cs @@ -6,13 +6,11 @@ namespace DotNetCore.CAP.PostgreSql.Test [Collection("postgresql")] public class SqlServerStorageTest : DatabaseTestHost { - private readonly string _dbName; private readonly string _masterDbConnectionString; private readonly string _dbConnectionString; public SqlServerStorageTest() { - _dbName = ConnectionUtil.GetDatabaseName(); _masterDbConnectionString = ConnectionUtil.GetMasterConnectionString(); _dbConnectionString = ConnectionUtil.GetConnectionString(); }