|
|
@@ -30,64 +30,64 @@ namespace DotNetCore.CAP.EntityFrameworkCore |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public Task PublishAsync(string topic, string content) |
|
|
|
public Task PublishAsync(string name, string content) |
|
|
|
{ |
|
|
|
if (topic == null) throw new ArgumentNullException(nameof(topic)); |
|
|
|
if (name == null) throw new ArgumentNullException(nameof(name)); |
|
|
|
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + |
|
|
|
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); |
|
|
|
|
|
|
|
return Publish(topic, content); |
|
|
|
return Publish(name, content); |
|
|
|
} |
|
|
|
|
|
|
|
public Task PublishAsync<T>(string topic, T contentObj) |
|
|
|
public Task PublishAsync<T>(string name, T contentObj) |
|
|
|
{ |
|
|
|
if (topic == null) throw new ArgumentNullException(nameof(topic)); |
|
|
|
if (name == null) throw new ArgumentNullException(nameof(name)); |
|
|
|
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + |
|
|
|
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); |
|
|
|
|
|
|
|
var content = Helper.ToJson(contentObj); |
|
|
|
return Publish(topic, content); |
|
|
|
return Publish(name, content); |
|
|
|
} |
|
|
|
|
|
|
|
public Task PublishAsync(string topic, string content, IDbConnection dbConnection) |
|
|
|
public Task PublishAsync(string name, string content, IDbConnection dbConnection) |
|
|
|
{ |
|
|
|
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); |
|
|
|
if (topic == null) throw new ArgumentNullException(nameof(topic)); |
|
|
|
if (name == null) throw new ArgumentNullException(nameof(name)); |
|
|
|
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); |
|
|
|
|
|
|
|
var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); |
|
|
|
return PublishWithTrans(topic, content, dbConnection, dbTransaction); |
|
|
|
return PublishWithTrans(name, content, dbConnection, dbTransaction); |
|
|
|
} |
|
|
|
|
|
|
|
public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) |
|
|
|
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) |
|
|
|
{ |
|
|
|
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); |
|
|
|
if (topic == null) throw new ArgumentNullException(nameof(topic)); |
|
|
|
if (name == null) throw new ArgumentNullException(nameof(name)); |
|
|
|
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); |
|
|
|
if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction)); |
|
|
|
|
|
|
|
return PublishWithTrans(topic, content, dbConnection, dbTransaction); |
|
|
|
return PublishWithTrans(name, content, dbConnection, dbTransaction); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task Publish(string topic, string content) |
|
|
|
private async Task Publish(string name, string content) |
|
|
|
{ |
|
|
|
var connection = _dbContext.Database.GetDbConnection(); |
|
|
|
var transaction = _dbContext.Database.CurrentTransaction; |
|
|
|
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); |
|
|
|
var dbTransaction = transaction.GetDbTransaction(); |
|
|
|
await PublishWithTrans(topic, content, connection, dbTransaction); |
|
|
|
await PublishWithTrans(name, content, connection, dbTransaction); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task PublishWithTrans(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) |
|
|
|
private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) |
|
|
|
{ |
|
|
|
var message = new CapPublishedMessage |
|
|
|
{ |
|
|
|
Name = topic, |
|
|
|
Name = name, |
|
|
|
Content = content, |
|
|
|
StatusName = StatusName.Scheduled |
|
|
|
}; |
|
|
|
|
|
|
|
var sql = $"INSERT INTO {_options.Schema}.[Published] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)"; |
|
|
|
var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; |
|
|
|
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); |
|
|
|
|
|
|
|
PublishQueuer.PulseEvent.Set(); |
|
|
|