Browse Source

if not user opend transaction, cap will auto commit transaction.

master
Savorboard 7 years ago
parent
commit
303713f171
1 changed files with 11 additions and 2 deletions
  1. +11
    -2
      src/DotNetCore.CAP.SqlServer/CapPublisher.cs

+ 11
- 2
src/DotNetCore.CAP.SqlServer/CapPublisher.cs View File

@@ -15,7 +15,10 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
private readonly DbContext _dbContext; private readonly DbContext _dbContext;


protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; } protected bool IsUsingEF { get; }

protected IServiceProvider ServiceProvider { get; } protected IServiceProvider ServiceProvider { get; }


public CapPublisher(IServiceProvider provider, SqlServerOptions options) public CapPublisher(IServiceProvider provider, SqlServerOptions options)
@@ -56,6 +59,7 @@ namespace DotNetCore.CAP.SqlServer
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));


var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;
return PublishWithTrans(name, content, dbConnection, dbTransaction); return PublishWithTrans(name, content, dbConnection, dbTransaction);
} }


@@ -73,12 +77,13 @@ namespace DotNetCore.CAP.SqlServer
{ {
var connection = _dbContext.Database.GetDbConnection(); var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction; var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction(); var dbTransaction = transaction.GetDbTransaction();
await PublishWithTrans(name, content, connection, dbTransaction); await PublishWithTrans(name, content, connection, dbTransaction);
} }


private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{ {
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
@@ -89,7 +94,11 @@ namespace DotNetCore.CAP.SqlServer


var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); PublishQueuer.PulseEvent.Set();
} }
} }

Loading…
Cancel
Save