diff --git a/samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs b/samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs index 3ba48cc..7b8f33c 100644 --- a/samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs @@ -21,7 +21,7 @@ namespace Sample.Kafka.PostgreSql.Controllers [Route("~/without/transaction")] public async Task WithoutTransaction() { - await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); + await _capBus.PublishAsync("sample.kafka.postgrsql", DateTime.Now); return Ok(); } @@ -36,10 +36,7 @@ namespace Sample.Kafka.PostgreSql.Controllers //your business code connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); - for (int i = 0; i < 5; i++) - { - _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); - } + _capBus.Publish("sample.kafka.postgrsql", DateTime.Now); transaction.Commit(); } @@ -49,8 +46,8 @@ namespace Sample.Kafka.PostgreSql.Controllers } - [CapSubscribe("#.test2")] - public void Test2(int value) + [CapSubscribe("sample.kafka.postgrsql")] + public void Test2(DateTime value) { Console.WriteLine("Subscriber output message: " + value); } diff --git a/samples/Sample.Kafka.PostgreSql/Startup.cs b/samples/Sample.Kafka.PostgreSql/Startup.cs index 5bda514..c47ce69 100644 --- a/samples/Sample.Kafka.PostgreSql/Startup.cs +++ b/samples/Sample.Kafka.PostgreSql/Startup.cs @@ -1,8 +1,7 @@ -using System; -using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; -namespace Sample.Kafka.MySql +namespace Sample.Kafka.PostgreSql { public class Startup { @@ -10,8 +9,8 @@ namespace Sample.Kafka.MySql { services.AddCap(x => { - x.UsePostgreSql("Server=localhost;Database=testcap;UserId=root;Password=123123;"); - x.UseKafka("localhost:9092"); + x.UsePostgreSql(""); + x.UseKafka(""); x.UseDashboard(); }); diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index d4204e3..cd3710b 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -45,7 +45,7 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { - Id = message.DbId, + Id = long.Parse(message.DbId), message.Retries, message.ExpiresAt, StatusName = state.ToString("G") @@ -59,7 +59,7 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { - Id = message.DbId, + Id = long.Parse(message.DbId), message.Retries, message.ExpiresAt, StatusName = state.ToString("G") @@ -85,13 +85,13 @@ namespace DotNetCore.CAP.PostgreSql var po = new { - Id = message.DbId, + Id = long.Parse(message.DbId), Name = name, message.Content, message.Retries, message.Added, message.ExpiresAt, - StatusName = StatusName.Scheduled + StatusName = nameof(StatusName.Scheduled) }; if (dbTransaction == null) @@ -121,7 +121,7 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { - Id = SnowflakeId.Default().NextId().ToString(), + Id = SnowflakeId.Default().NextId(), Group = group, Name = name, Content = content, @@ -150,7 +150,7 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { - Id = mdMessage.DbId, + Id = long.Parse(mdMessage.DbId), Group = group, Name = name, Content = content, @@ -168,7 +168,7 @@ namespace DotNetCore.CAP.PostgreSql await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); return await connection.ExecuteAsync( - $"DELETE FROM {table} WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @count);", + $"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", new { timeout, batchCount }); } diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index ad494c9..f8e8ba0 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql var batchSql = $@" CREATE SCHEMA IF NOT EXISTS ""{schema}""; -CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}( +CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}( ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL, @@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}( ""StatusName"" VARCHAR(50) NOT NULL ); -CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}( +CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}( ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Version"" VARCHAR(20) NOT NULL, ""Name"" VARCHAR(200) NOT NULL,