@@ -7,11 +7,13 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
namespace DotNetCore.CAP.SqlServer
{
{
public class CapPublisher : ICapPublisher
public class CapPublisher : ICapPublisher
{
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;
private readonly DbContext _dbContext;
@@ -21,9 +23,12 @@ namespace DotNetCore.CAP.SqlServer
protected IServiceProvider ServiceProvider { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider, SqlServerOptions options)
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
{
{
ServiceProvider = provider;
ServiceProvider = provider;
_logger = logger;
_options = options;
_options = options;
if (_options.DbContextType != null)
if (_options.DbContextType != null)
@@ -33,57 +38,152 @@ namespace DotNetCore.CAP.SqlServer
}
}
}
}
public void Publish(string name, string content)
{
CheckIsUsingEF(name);
PublishCore(name, content);
}
public Task PublishAsync(string name, string content)
public Task PublishAsync(string name, string content)
{
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
CheckIsUsingEF(name);
return Publish(name, content);
return PublishCoreAsync(name, content);
}
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
var content = Helper.ToJson(contentObj);
PublishCore(name, content);
}
}
public Task PublishAsync<T>(string name, T contentObj)
public Task PublishAsync<T>(string name, T contentObj)
{
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
CheckIsUsingEF(name);
var content = Helper.ToJson(contentObj);
var content = Helper.ToJson(contentObj);
return Publish(name, content);
return PublishCoreAsync(name, content);
}
}
public Task PublishAsync (string name, string content, IDbConnection dbConnection)
public void Publish (string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null )
{
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
CheckIsAdoNet(name);
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;
IsCapOpenedTrans = true;
return PublishWithTrans(name, content, dbConnection, dbTransaction);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
}
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
var content = Helper.ToJson(contentObj);
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));
var content = Helper.ToJson(contentObj);
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}
#region private methods
private void CheckIsUsingEF(string name)
{
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}
return PublishWithTrans(name, content, dbConnection, dbTransaction);
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}
}
private async Task Publish(string name, string content)
private async Task PublishCoreAsync (string name, string content)
{
{
var connection = _dbContext.Database.GetDbConnection();
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTrans(name, content, connection, dbTransaction);
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
}
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}
}
protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
{
var message = new CapPublishedMessage
var message = new CapPublishedMessage
{
{
@@ -91,15 +191,24 @@ namespace DotNetCore.CAP.SqlServer
Content = content,
Content = content,
StatusName = StatusName.Scheduled
StatusName = StatusName.Scheduled
};
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);
var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction);
if (IsCapOpenedTrans)
if (IsCapOpenedTrans)
{
{
dbTransaction.Commit();
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
dbConnection.Dispose();
}
}
PublishQueuer.PulseEvent.Set();
PublishQueuer.PulseEvent.Set();
}
}
private string PrepareSql()
{
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
#endregion private methods
}
}
}
}