diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs index 13a8700..e90bdad 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs @@ -30,64 +30,64 @@ namespace DotNetCore.CAP.EntityFrameworkCore } } - public Task PublishAsync(string topic, string content) + public Task PublishAsync(string name, string content) { - if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (name == null) throw new ArgumentNullException(nameof(name)); if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); - return Publish(topic, content); + return Publish(name, content); } - public Task PublishAsync(string topic, T contentObj) + public Task PublishAsync(string name, T contentObj) { - if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (name == null) throw new ArgumentNullException(nameof(name)); if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); var content = Helper.ToJson(contentObj); - return Publish(topic, content); + return Publish(name, content); } - public Task PublishAsync(string topic, string content, IDbConnection dbConnection) + public Task PublishAsync(string name, string content, IDbConnection dbConnection) { if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (name == null) throw new ArgumentNullException(nameof(name)); if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - return PublishWithTrans(topic, content, dbConnection, dbTransaction); + return PublishWithTrans(name, content, dbConnection, dbTransaction); } - public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (name == null) throw new ArgumentNullException(nameof(name)); if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction)); - return PublishWithTrans(topic, content, dbConnection, dbTransaction); + return PublishWithTrans(name, content, dbConnection, dbTransaction); } - private async Task Publish(string topic, string content) + private async Task Publish(string name, string content) { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTrans(topic, content, connection, dbTransaction); + await PublishWithTrans(name, content, connection, dbTransaction); } - private async Task PublishWithTrans(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { - Name = topic, + Name = name, Content = content, StatusName = StatusName.Scheduled }; - var sql = $"INSERT INTO {_options.Schema}.[Published] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)"; + var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); PublishQueuer.PulseEvent.Set();