Browse Source

refactor sql storage.

master
Savorboard 7 years ago
parent
commit
cc3febd22b
9 changed files with 42 additions and 24 deletions
  1. +6
    -2
      src/DotNetCore.CAP.MySql/MySqlStorage.cs
  2. +8
    -6
      src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
  3. +1
    -1
      src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs
  4. +6
    -2
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs
  5. +7
    -6
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
  6. +1
    -1
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs
  7. +6
    -2
      src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs
  8. +6
    -3
      src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
  9. +1
    -1
      src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs

+ 6
- 2
src/DotNetCore.CAP.MySql/MySqlStorage.cs View File

@@ -14,16 +14,20 @@ namespace DotNetCore.CAP.MySql
private readonly IDbConnection _existingConnection = null; private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
private readonly CapOptions _capOptions;


public MySqlStorage(ILogger<MySqlStorage> logger, MySqlOptions options)
public MySqlStorage(ILogger<MySqlStorage> logger,
MySqlOptions options,
CapOptions capOptions)
{ {
_options = options; _options = options;
_capOptions = capOptions;
_logger = logger; _logger = logger;
} }


public IStorageConnection GetConnection() public IStorageConnection GetConnection()
{ {
return new MySqlStorageConnection(_options);
return new MySqlStorageConnection(_options, _capOptions);
} }


public IMonitoringApi GetMonitoringApi() public IMonitoringApi GetMonitoringApi()


+ 8
- 6
src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs View File

@@ -11,12 +11,14 @@ namespace DotNetCore.CAP.MySql
{ {
public class MySqlStorageConnection : IStorageConnection public class MySqlStorageConnection : IStorageConnection
{ {
private readonly CapOptions _capOptions;
private readonly string _prefix; private readonly string _prefix;


private const string DateTimeMaxValue = "9999-12-31 23:59:59"; private const string DateTimeMaxValue = "9999-12-31 23:59:59";


public MySqlStorageConnection(MySqlOptions options)
public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
{ {
_capOptions = capOptions;
Options = options; Options = options;
_prefix = Options.TableNamePrefix; _prefix = Options.TableNamePrefix;
} }
@@ -43,9 +45,9 @@ namespace DotNetCore.CAP.MySql
var sql = $@" var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;"; DELETE FROM `{_prefix}.queue` LIMIT 1;";
// var sql = $@"
//SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
//DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";
// var sql = $@"
//SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
//DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";


return FetchNextMessageCoreAsync(sql); return FetchNextMessageCoreAsync(sql);
} }
@@ -64,7 +66,7 @@ SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";


public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{ {
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';";
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";


using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
@@ -109,7 +111,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";


public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages() public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{ {
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';";
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `StatusName` = '{StatusName.Failed}' LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString)) using (var connection = new MySqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);


+ 1
- 1
src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs View File

@@ -29,7 +29,7 @@ namespace DotNetCore.CAP.MySql
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));


var sql = var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction); _dbConnection.Execute(sql, message, _dbTransaction);
} }




+ 6
- 2
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs View File

@@ -13,17 +13,21 @@ namespace DotNetCore.CAP.PostgreSql
{ {
private readonly IDbConnection _existingConnection = null; private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly PostgreSqlOptions _options; private readonly PostgreSqlOptions _options;


public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger,
CapOptions capOptions,
PostgreSqlOptions options)
{ {
_options = options; _options = options;
_logger = logger; _logger = logger;
_capOptions = capOptions;
} }


public IStorageConnection GetConnection() public IStorageConnection GetConnection()
{ {
return new PostgreSqlStorageConnection(_options);
return new PostgreSqlStorageConnection(_options, _capOptions);
} }


public IMonitoringApi GetMonitoringApi() public IMonitoringApi GetMonitoringApi()


+ 7
- 6
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs View File

@@ -11,8 +11,11 @@ namespace DotNetCore.CAP.PostgreSql
{ {
public class PostgreSqlStorageConnection : IStorageConnection public class PostgreSqlStorageConnection : IStorageConnection
{ {
public PostgreSqlStorageConnection(PostgreSqlOptions options)
private readonly CapOptions _capOptions;

public PostgreSqlStorageConnection(PostgreSqlOptions options,CapOptions capOptions)
{ {
_capOptions = capOptions;
Options = options; Options = options;
} }


@@ -35,9 +38,7 @@ namespace DotNetCore.CAP.PostgreSql


public Task<IFetchedMessage> FetchNextMessageAsync() public Task<IFetchedMessage> FetchNextMessageAsync()
{ {
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{
Options.Schema
}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{Options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql); return FetchNextMessageCoreAsync(sql);
} }


@@ -55,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{ {
var sql = var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";


using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
{ {
@@ -98,7 +99,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages() public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{ {
var sql = var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"StatusName\"='{StatusName.Failed}' LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString)) using (var connection = new NpgsqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);


+ 1
- 1
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs View File

@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.PostgreSql
var sql = var sql =
$@"UPDATE ""{ $@"UPDATE ""{
_schema _schema
}"".""published"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
}"".""published"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction); _dbConnection.Execute(sql, message, _dbTransaction);
} }




+ 6
- 2
src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs View File

@@ -13,17 +13,21 @@ namespace DotNetCore.CAP.SqlServer
{ {
private readonly IDbConnection _existingConnection = null; private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly CapOptions _capOptions;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;


public SqlServerStorage(ILogger<SqlServerStorage> logger, SqlServerOptions options)
public SqlServerStorage(ILogger<SqlServerStorage> logger,
CapOptions capOptions,
SqlServerOptions options)
{ {
_options = options; _options = options;
_logger = logger; _logger = logger;
_capOptions = capOptions;
} }


public IStorageConnection GetConnection() public IStorageConnection GetConnection()
{ {
return new SqlServerStorageConnection(_options);
return new SqlServerStorageConnection(_options, _capOptions);
} }


public IMonitoringApi GetMonitoringApi() public IMonitoringApi GetMonitoringApi()


+ 6
- 3
src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs View File

@@ -11,8 +11,11 @@ namespace DotNetCore.CAP.SqlServer
{ {
public class SqlServerStorageConnection : IStorageConnection public class SqlServerStorageConnection : IStorageConnection
{ {
public SqlServerStorageConnection(SqlServerOptions options)
private readonly CapOptions _capOptions;

public SqlServerStorageConnection(SqlServerOptions options, CapOptions capOptions)
{ {
_capOptions = capOptions;
Options = options; Options = options;
} }


@@ -57,7 +60,7 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{ {
var sql = var sql =
$"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";


using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
@@ -114,7 +117,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages() public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
{ {
var sql = var sql =
$"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(Options.ConnectionString)) using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);


+ 1
- 1
src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs View File

@@ -29,7 +29,7 @@ namespace DotNetCore.CAP.SqlServer
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));


var sql = var sql =
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
$"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction); _dbConnection.Execute(sql, message, _dbTransaction);
} }




Loading…
Cancel
Save