Browse Source

modify CapPublisher to implement ICallbackPublisher

master
yangxiaodong 7 years ago
parent
commit
42c856fc5d
2 changed files with 24 additions and 5 deletions
  1. +9
    -3
      src/DotNetCore.CAP.MySql/CapPublisher.cs
  2. +15
    -2
      src/DotNetCore.CAP.PostgreSql/CapPublisher.cs

+ 9
- 3
src/DotNetCore.CAP.MySql/CapPublisher.cs View File

@@ -7,10 +7,11 @@ using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;


namespace DotNetCore.CAP.MySql namespace DotNetCore.CAP.MySql
{ {
public class CapPublisher : CapPublisherBase
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly MySqlOptions _options; private readonly MySqlOptions _options;
@@ -61,6 +62,13 @@ namespace DotNetCore.CAP.MySql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
} }


public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new MySqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}
#region private methods #region private methods


private string PrepareSql() private string PrepareSql()
@@ -68,8 +76,6 @@ namespace DotNetCore.CAP.MySql
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
} }




#endregion private methods #endregion private methods
} }
} }

+ 15
- 2
src/DotNetCore.CAP.PostgreSql/CapPublisher.cs View File

@@ -7,10 +7,11 @@ using DotNetCore.CAP.Abstractions;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Npgsql;


namespace DotNetCore.CAP.PostgreSql namespace DotNetCore.CAP.PostgreSql
{ {
public class CapPublisher : CapPublisherBase
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly PostgreSqlOptions _options; private readonly PostgreSqlOptions _options;
@@ -61,9 +62,21 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
} }


public async Task PublishAsync(CapPublishedMessage message)
{
using (var conn = new NpgsqlConnection(_options.ConnectionString))
{
await conn.ExecuteAsync(PrepareSql(), message);
}
}

#region private methods

private string PrepareSql() private string PrepareSql()
{ {
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
}

#endregion
} }
} }

Loading…
Cancel
Save