From 303713f17142b34b263b9c0bf32b3b2dc5fda3f0 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 19 Jul 2017 23:04:02 +0800 Subject: [PATCH] if not user opend transaction, cap will auto commit transaction. --- src/DotNetCore.CAP.SqlServer/CapPublisher.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 3cba0a0..6889c27 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -15,7 +15,10 @@ namespace DotNetCore.CAP.SqlServer private readonly SqlServerOptions _options; private readonly DbContext _dbContext; + protected bool IsCapOpenedTrans { get; set; } + protected bool IsUsingEF { get; } + protected IServiceProvider ServiceProvider { get; } public CapPublisher(IServiceProvider provider, SqlServerOptions options) @@ -56,6 +59,7 @@ namespace DotNetCore.CAP.SqlServer if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; return PublishWithTrans(name, content, dbConnection, dbTransaction); } @@ -73,12 +77,13 @@ namespace DotNetCore.CAP.SqlServer { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); await PublishWithTrans(name, content, connection, dbTransaction); } - private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { @@ -89,7 +94,11 @@ namespace DotNetCore.CAP.SqlServer var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); - + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbConnection.Dispose(); + } PublishQueuer.PulseEvent.Set(); } }