Browse Source

support PostgreSQL.

master
yangxiaodong 7 years ago
parent
commit
1cd8d6ff40
16 changed files with 813 additions and 2 deletions
  1. +2
    -2
      build/version.props
  2. +18
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs
  3. +49
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs
  4. +52
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
  5. +11
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
  6. +78
    -0
      src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
  7. +26
    -0
      src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
  8. +11
    -0
      src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs
  9. +61
    -0
      src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs
  10. +74
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs
  11. +67
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs
  12. +140
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
  13. +71
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs
  14. +143
    -0
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  15. +5
    -0
      src/DotNetCore.CAP/Models/CapPublishedMessage.cs
  16. +5
    -0
      src/DotNetCore.CAP/Models/CapReceivedMessage.cs

+ 2
- 2
build/version.props View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 18
- 0
src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs View File

@@ -0,0 +1,18 @@
using System;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
public const string DefaultSchema = "cap";

/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;

public Type DbContextType { get; set; }
}
}

+ 49
- 0
src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs View File

@@ -0,0 +1,49 @@
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UsePostgreSql(this CapOptions options, string connectionString)
{
return options.UsePostgreSql(opt =>
{
opt.ConnectionString = connectionString;
});
}

public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));

return options;
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

var efOptions = new EFOptions { DbContextType = typeof(TContext) };
configure(efOptions);

options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));

return options;
}
}
}

+ 52
- 0
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs View File

@@ -0,0 +1,52 @@
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.PostgreSql;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal class PostgreSqlCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<PostgreSqlOptions> _configure;

public PostgreSqlCapOptionsExtension(Action<PostgreSqlOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddScoped<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();

var postgreSqlOptions = new PostgreSqlOptions();
_configure(postgreSqlOptions);

if (postgreSqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
services.AddSingleton(postgreSqlOptions);
}

#if NETSTANDARD1_6
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#else
private ServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#endif

}
}

+ 11
- 0
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs View File

@@ -0,0 +1,11 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
}
}

+ 78
- 0
src/DotNetCore.CAP.PostgreSql/CapPublisher.cs View File

@@ -0,0 +1,78 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Abstractions;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase
{
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly DbContext _dbContext;

public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
{
_options = options;
_logger = logger;

if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}

protected override void PrepareConnectionForEF()
{
_dbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
_dbTranasaction = transaction.GetDbTransaction();
}

protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);

_logger.LogDebug("Message has been persisted in the database. name:" + message.ToString());

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
}

protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogDebug("Message has been persisted in the database. name:" + message.ToString());

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
}

private string PrepareSql()
{
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
}
}

+ 26
- 0
src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj View File

@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|netstandard1.6|AnyCPU'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Npgsql" Version="3.2.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 11
- 0
src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs View File

@@ -0,0 +1,11 @@
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }

public MessageType MessageType { get; set; }
}
}

+ 61
- 0
src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs View File

@@ -0,0 +1,61 @@
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;

private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);

private static readonly string[] Tables =
{
"published","received"
};

public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");

foreach (var table in Tables)
{
var removedCount = 0;
do
{
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE ExpiresAt < @now LIMIT @count;",
new { now = DateTime.Now, count = MaxBatch });
}

if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}

await context.WaitAsync(_waitingInterval);
}
}
}

+ 74
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs View File

@@ -0,0 +1,74 @@
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();

public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}

public int MessageId { get; }

public MessageType MessageType { get; }

public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}

public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}

public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}

private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}

+ 67
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs View File

@@ -0,0 +1,67 @@
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly PostgreSqlOptions _options;
private readonly ILogger _logger;

public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
{
_options = options;
_logger = logger;
}

public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;

var sql = CreateDbTablesScript(_options.Schema);

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}

protected virtual string CreateDbTablesScript(string schema)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";

CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);

CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);

CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);";
return batchSql;
}
}
}

+ 140
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs View File

@@ -0,0 +1,140 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageConnection : IStorageConnection
{
private readonly PostgreSqlOptions _options;

public PostgreSqlStorageConnection(PostgreSqlOptions options)
{
_options = options;
}

public PostgreSqlOptions Options => _options;

public IStorageTransaction CreateTransaction()
{
return new PostgreSqlStorageTransaction(this);
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"Id\"={id}";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
SELECT ""MessageId"",""MessageType"" FROM ""{_options.Schema}"".""queue"" LIMIT 1 FOR UPDATE;
DELETE FROM ""{_options.Schema}"".""queue"" LIMIT 1;";
return FetchNextMessageCoreAsync(sql);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}'";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}

// CapReceviedMessage

public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO \"{_options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"Id\"={id}";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}'";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}

public void Dispose()
{
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
throw;
}

if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}

return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
}
}

+ 71
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs View File

@@ -0,0 +1,71 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageTransaction : IStorageTransaction, IDisposable
{
private readonly string _schema;

private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;

public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;

_dbConnection = new NpgsqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}

public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
}

public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
}

public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}

public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}

+ 143
- 0
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -0,0 +1,143 @@
using System;
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;

namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
{
protected IDbConnection _dbConnection;
protected IDbTransaction _dbTranasaction;

protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; set; }

protected IServiceProvider ServiceProvider { get; }
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();

var content = Serialize(contentObj);

PublishWithTrans(name, content, _dbConnection, _dbTranasaction);
}

public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, _dbConnection, _dbTranasaction);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

protected abstract void PrepareConnectionForEF();

protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);

protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);

#region private methods

private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}

private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

await ExecuteAsync(dbConnection, dbTransaction, message);

PublishQueuer.PulseEvent.Set();
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

Execute(dbConnection, dbTransaction, message);
PublishQueuer.PulseEvent.Set();
}

#endregion private methods
}
}

+ 5
- 0
src/DotNetCore.CAP/Models/CapPublishedMessage.cs View File

@@ -34,5 +34,10 @@ namespace DotNetCore.CAP.Models
public int Retries { get; set; }

public string StatusName { get; set; }

public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}

+ 5
- 0
src/DotNetCore.CAP/Models/CapReceivedMessage.cs View File

@@ -47,5 +47,10 @@ namespace DotNetCore.CAP.Models
Content = Content
};
}

public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}

Loading…
Cancel
Save