Browse Source

Fix postgresql storage bugs

master
Savorboard 5 years ago
parent
commit
801b374079
4 changed files with 17 additions and 21 deletions
  1. +4
    -7
      samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs
  2. +4
    -5
      samples/Sample.Kafka.PostgreSql/Startup.cs
  3. +7
    -7
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  4. +2
    -2
      src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs

+ 4
- 7
samples/Sample.Kafka.PostgreSql/Controllers/ValuesController.cs View File

@@ -21,7 +21,7 @@ namespace Sample.Kafka.PostgreSql.Controllers
[Route("~/without/transaction")] [Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction() public async Task<IActionResult> WithoutTransaction()
{ {
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
await _capBus.PublishAsync("sample.kafka.postgrsql", DateTime.Now);


return Ok(); return Ok();
} }
@@ -36,10 +36,7 @@ namespace Sample.Kafka.PostgreSql.Controllers
//your business code //your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);


for (int i = 0; i < 5; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
_capBus.Publish("sample.kafka.postgrsql", DateTime.Now);


transaction.Commit(); transaction.Commit();
} }
@@ -49,8 +46,8 @@ namespace Sample.Kafka.PostgreSql.Controllers
} }




[CapSubscribe("#.test2")]
public void Test2(int value)
[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{ {
Console.WriteLine("Subscriber output message: " + value); Console.WriteLine("Subscriber output message: " + value);
} }


+ 4
- 5
samples/Sample.Kafka.PostgreSql/Startup.cs View File

@@ -1,8 +1,7 @@
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;


namespace Sample.Kafka.MySql
namespace Sample.Kafka.PostgreSql
{ {
public class Startup public class Startup
{ {
@@ -10,8 +9,8 @@ namespace Sample.Kafka.MySql
{ {
services.AddCap(x => services.AddCap(x =>
{ {
x.UsePostgreSql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
x.UseKafka("localhost:9092");
x.UsePostgreSql("");
x.UseKafka("");
x.UseDashboard(); x.UseDashboard();
}); });




+ 7
- 7
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs View File

@@ -45,7 +45,7 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = message.DbId,
Id = long.Parse(message.DbId),
message.Retries, message.Retries,
message.ExpiresAt, message.ExpiresAt,
StatusName = state.ToString("G") StatusName = state.ToString("G")
@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = message.DbId,
Id = long.Parse(message.DbId),
message.Retries, message.Retries,
message.ExpiresAt, message.ExpiresAt,
StatusName = state.ToString("G") StatusName = state.ToString("G")
@@ -85,13 +85,13 @@ namespace DotNetCore.CAP.PostgreSql


var po = new var po = new
{ {
Id = message.DbId,
Id = long.Parse(message.DbId),
Name = name, Name = name,
message.Content, message.Content,
message.Retries, message.Retries,
message.Added, message.Added,
message.ExpiresAt, message.ExpiresAt,
StatusName = StatusName.Scheduled
StatusName = nameof(StatusName.Scheduled)
}; };


if (dbTransaction == null) if (dbTransaction == null)
@@ -121,7 +121,7 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = SnowflakeId.Default().NextId().ToString(),
Id = SnowflakeId.Default().NextId(),
Group = group, Group = group,
Name = name, Name = name,
Content = content, Content = content,
@@ -150,7 +150,7 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new await connection.ExecuteAsync(sql, new
{ {
Id = mdMessage.DbId,
Id = long.Parse(mdMessage.DbId),
Group = group, Group = group,
Name = name, Name = name,
Content = content, Content = content,
@@ -168,7 +168,7 @@ namespace DotNetCore.CAP.PostgreSql
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);


return await connection.ExecuteAsync( return await connection.ExecuteAsync(
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @count);",
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);",
new { timeout, batchCount }); new { timeout, batchCount });
} }




+ 2
- 2
src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs View File

@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql
var batchSql = $@" var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}""; CREATE SCHEMA IF NOT EXISTS ""{schema}"";


CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}(
CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}(
""Id"" BIGINT PRIMARY KEY NOT NULL, ""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL, ""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL, ""Name"" VARCHAR(200) NOT NULL,
@@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}(
""StatusName"" VARCHAR(50) NOT NULL ""StatusName"" VARCHAR(50) NOT NULL
); );


CREATE TABLE IF NOT EXISTS {GetReceivedTableName()}(
CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}(
""Id"" BIGINT PRIMARY KEY NOT NULL, ""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL, ""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL, ""Name"" VARCHAR(200) NOT NULL,


Loading…
Cancel
Save