diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index 78c2a2e..b5d56c3 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -25,12 +25,13 @@ namespace DotNetCore.CAP.SqlServer return new SqlServerStorageTransaction(this); } - public Task GetPublishedMessageAsync(int id) + public async Task GetPublishedMessageAsync(int id) { var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; + using (var connection = new SqlConnection(_options.ConnectionString)) { - return connection.QueryFirstOrDefaultAsync(sql); + return await connection.QueryFirstOrDefaultAsync(sql); } } @@ -56,7 +57,7 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; // CapReceviedMessage - public Task StoreReceivedMessageAsync(CapReceivedMessage message) + public async Task StoreReceivedMessageAsync(CapReceivedMessage message) { if (message == null) throw new ArgumentNullException(nameof(message)); @@ -66,16 +67,16 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; using (var connection = new SqlConnection(_options.ConnectionString)) { - return connection.ExecuteAsync(sql, message); + await connection.ExecuteAsync(sql, message); } } - public Task GetReceivedMessageAsync(int id) + public async Task GetReceivedMessageAsync(int id) { var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; using (var connection = new SqlConnection(_options.ConnectionString)) { - return connection.QueryFirstOrDefaultAsync(sql); + return await connection.QueryFirstOrDefaultAsync(sql); } } @@ -94,26 +95,30 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; private async Task FetchNextMessageCoreAsync(string sql, object args = null) { - using (var connection = new SqlConnection(_options.ConnectionString)) + //here don't use `using` to dispose + var connection = new SqlConnection(_options.ConnectionString); + await connection.OpenAsync(); + var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); + FetchedMessage fetchedMessage = null; + try + { + fetchedMessage = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); + } + catch (SqlException) { - using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted)) - { - try - { - var fetched = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); - - if (fetched == null) - return null; - - return new SqlServerFetchedMessage(fetched.MessageId, fetched.MessageType, connection, transaction); - } - catch (Exception) - { - transaction.Rollback(); - return null; - } - } + transaction.Dispose(); + throw; } + + if (fetchedMessage == null) + { + transaction.Rollback(); + transaction.Dispose(); + connection.Dispose(); + return null; + } + + return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs index f1421b1..b30c616 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageTransaction.cs @@ -20,6 +20,7 @@ namespace DotNetCore.CAP.SqlServer _schema = options.Schema; _dbConnection = new SqlConnection(options.ConnectionString); + _dbConnection.Open(); _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); } @@ -27,16 +28,16 @@ namespace DotNetCore.CAP.SqlServer { if (message == null) throw new ArgumentNullException(nameof(message)); - var sql = $"UPDATE [{_schema}].[Published] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message); + var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); } public void UpdateMessage(CapReceivedMessage message) { if (message == null) throw new ArgumentNullException(nameof(message)); - var sql = $"UPDATE [{_schema}].[Received] SET [ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message); + var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); } public void EnqueueMessage(CapPublishedMessage message) @@ -44,7 +45,7 @@ namespace DotNetCore.CAP.SqlServer if (message == null) throw new ArgumentNullException(nameof(message)); var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; - _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }); + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction); } public void EnqueueMessage(CapReceivedMessage message) @@ -52,7 +53,7 @@ namespace DotNetCore.CAP.SqlServer if (message == null) throw new ArgumentNullException(nameof(message)); var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; - _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }); + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction); } public Task CommitAsync() diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 0ca1949..f317b85 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -113,7 +113,7 @@ namespace DotNetCore.CAP var messageStore = provider.GetRequiredService(); var receivedMessage = new CapReceivedMessage(messageContext) { - StatusName = StatusName.Enqueued, + StatusName = StatusName.Scheduled, }; messageStore.StoreReceivedMessageAsync(receivedMessage).Wait(); return receivedMessage; diff --git a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs index 10fd2eb..27742ab 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs @@ -24,56 +24,53 @@ namespace DotNetCore.CAP public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) { - using (fetched) + var message = await connection.GetPublishedMessageAsync(fetched.MessageId); + try { - var message = await connection.GetPublishedMessageAsync(fetched.MessageId); - try - { - var sp = Stopwatch.StartNew(); - await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); - if (message.Retries > 0) - { - _logger.JobRetrying(message.Retries); - } - var result = await PublishAsync(message.Name, message.Content); - sp.Stop(); + if (message.Retries > 0) + { + _logger.JobRetrying(message.Retries); + } + var result = await PublishAsync(message.Name, message.Content); + sp.Stop(); - var newState = default(IState); - if (!result.Succeeded) + var newState = default(IState); + if (!result.Succeeded) + { + var shouldRetry = await UpdateJobForRetryAsync(message, connection); + if (shouldRetry) { - var shouldRetry = await UpdateJobForRetryAsync(message, connection); - if (shouldRetry) - { - newState = new ScheduledState(); - _logger.JobFailedWillRetry(result.Exception); - } - else - { - newState = new FailedState(); - _logger.JobFailed(result.Exception); - } + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); } else { - newState = new SucceededState(); + newState = new FailedState(); + _logger.JobFailed(result.Exception); } - await _stateChanger.ChangeStateAsync(message, newState, connection); - - fetched.RemoveFromQueue(); + } + else + { + newState = new SucceededState(); + } + await _stateChanger.ChangeStateAsync(message, newState, connection); - if (result.Succeeded) - { - _logger.JobExecuted(sp.Elapsed.TotalSeconds); - } + fetched.RemoveFromQueue(); - return OperateResult.Success; - } - catch (Exception ex) + if (result.Succeeded) { - _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); - return OperateResult.Failed(ex); + _logger.JobExecuted(sp.Elapsed.TotalSeconds); } + + return OperateResult.Success; + } + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); + return OperateResult.Failed(ex); } } @@ -81,7 +78,7 @@ namespace DotNetCore.CAP { var retryBehavior = RetryBehavior.DefaultRetry; - var now = DateTime.UtcNow; + var now = DateTime.Now; var retries = ++message.Retries; if (retries >= retryBehavior.RetryCount) { diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs index 503f2f3..6ea5095 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs @@ -37,61 +37,58 @@ namespace DotNetCore.CAP public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) { - using (fetched) + var message = await connection.GetReceivedMessageAsync(fetched.MessageId); + try { - var message = await connection.GetReceivedMessageAsync(fetched.MessageId); - try - { - var sp = Stopwatch.StartNew(); - await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); - if (message.Retries > 0) - { - _logger.JobRetrying(message.Retries); - } - var result = await ExecuteSubscribeAsync(message); - sp.Stop(); + if (message.Retries > 0) + { + _logger.JobRetrying(message.Retries); + } + var result = await ExecuteSubscribeAsync(message); + sp.Stop(); - var newState = default(IState); - if (!result.Succeeded) + var newState = default(IState); + if (!result.Succeeded) + { + var shouldRetry = await UpdateJobForRetryAsync(message, connection); + if (shouldRetry) { - var shouldRetry = await UpdateJobForRetryAsync(message, connection); - if (shouldRetry) - { - newState = new ScheduledState(); - _logger.JobFailedWillRetry(result.Exception); - } - else - { - newState = new FailedState(); - _logger.JobFailed(result.Exception); - } + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); } else { - newState = new SucceededState(); - } - await _stateChanger.ChangeStateAsync(message, newState, connection); - - fetched.RemoveFromQueue(); - - if (result.Succeeded) - { - _logger.JobExecuted(sp.Elapsed.TotalSeconds); + newState = new FailedState(); + _logger.JobFailed(result.Exception); } - - return OperateResult.Success; } - catch (SubscriberNotFoundException ex) + else { - _logger.LogError(ex.Message); - return OperateResult.Failed(ex); + newState = new SucceededState(); } - catch (Exception ex) + await _stateChanger.ChangeStateAsync(message, newState, connection); + + fetched.RemoveFromQueue(); + + if (result.Succeeded) { - _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); - return OperateResult.Failed(ex); + _logger.JobExecuted(sp.Elapsed.TotalSeconds); } + + return OperateResult.Success; + } + catch (SubscriberNotFoundException ex) + { + _logger.LogError(ex.Message); + return OperateResult.Failed(ex); + } + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); + return OperateResult.Failed(ex); } } diff --git a/test/DotNetCore.CAP.EntityFrameworkCore.Test/DotNetCore.CAP.EntityFrameworkCore.Test.csproj b/test/DotNetCore.CAP.EntityFrameworkCore.Test/DotNetCore.CAP.EntityFrameworkCore.Test.csproj index 53b28ec..17da1b0 100644 --- a/test/DotNetCore.CAP.EntityFrameworkCore.Test/DotNetCore.CAP.EntityFrameworkCore.Test.csproj +++ b/test/DotNetCore.CAP.EntityFrameworkCore.Test/DotNetCore.CAP.EntityFrameworkCore.Test.csproj @@ -18,7 +18,7 @@ - +