Browse Source

refactor async method.

undefined
Savorboard 7 years ago
parent
commit
484f6b840c
5 changed files with 28 additions and 26 deletions
  1. +9
    -9
      src/DotNetCore.CAP.MySql/CapPublisher.cs
  2. +6
    -4
      src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
  3. +1
    -2
      src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
  4. +2
    -2
      src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
  5. +10
    -9
      src/DotNetCore.CAP.SqlServer/CapPublisher.cs

+ 9
- 9
src/DotNetCore.CAP.MySql/CapPublisher.cs View File

@@ -25,18 +25,16 @@ namespace DotNetCore.CAP.MySql
_options = options;
_logger = logger;

if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
if (_options.DbContextType == null) return;
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}

public async Task PublishAsync(CapPublishedMessage message)
public Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
return conn.ExecuteAsync(PrepareSql(), message);
}
}

@@ -64,12 +62,14 @@ namespace DotNetCore.CAP.MySql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}

protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Published Message has been persisted in the database. name:" + message);

return Task.CompletedTask;
}

#region private methods


+ 6
- 4
src/DotNetCore.CAP.PostgreSql/CapPublisher.cs View File

@@ -32,11 +32,11 @@ namespace DotNetCore.CAP.PostgreSql
}
}

public async Task PublishAsync(CapPublishedMessage message)
public Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
return conn.ExecuteAsync(PrepareSql(), message);
}
}

@@ -64,12 +64,14 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}

protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Published Message has been persisted in the database. name:" + message);

return Task.CompletedTask;
}

#region private methods


+ 1
- 2
src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs View File

@@ -38,8 +38,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
_maxSize = 0;

IConnection context;
while (_pool.TryDequeue(out context))
while (_pool.TryDequeue(out var context))
context.Dispose();
}



+ 2
- 2
src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs View File

@@ -42,14 +42,14 @@ namespace DotNetCore.CAP.RabbitMQ
null,
body);

_logger.LogDebug($"rabbitmq topic message [{keyName}] has been published.");
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");
}
return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(
$"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
$"RabbitMQ topic message [{keyName}] has been raised an exception of sending. the exception is: {ex.Message}");

return Task.FromResult(OperateResult.Failed(ex,
new OperateError


+ 10
- 9
src/DotNetCore.CAP.SqlServer/CapPublisher.cs View File

@@ -25,18 +25,17 @@ namespace DotNetCore.CAP.SqlServer
_logger = logger;
_options = options;

if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}
if (_options.DbContextType == null) return;

IsUsingEF = true;
_dbContext = (DbContext) ServiceProvider.GetService(_options.DbContextType);
}

public async Task PublishAsync(CapPublishedMessage message)
public Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
return conn.ExecuteAsync(PrepareSql(), message);
}
}

@@ -64,12 +63,14 @@ namespace DotNetCore.CAP.SqlServer
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}

protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Published Message has been persisted in the database. name:" + message);

return Task.CompletedTask;
}

#region private methods


Loading…
Cancel
Save