@@ -30,7 +30,7 @@ select count(Id) from `{0}.received` where StatusName = N'Succeeded'; | |||
select count(Id) from `{0}.published` where StatusName = N'Failed'; | |||
select count(Id) from `{0}.received` where StatusName = N'Failed'; | |||
select count(Id) from `{0}.published` where StatusName in (N'Processing',N'Scheduled',N'Enqueued'); | |||
select count(Id) from `{0}.received` where StatusName = N'Processing';", _prefix); | |||
select count(Id) from `{0}.received` where StatusName in (N'Processing',N'Scheduled',N'Enqueued');", _prefix); | |||
var statistics = UseConnection(connection => | |||
{ | |||
@@ -36,7 +36,7 @@ namespace DotNetCore.CAP.MySql | |||
{ | |||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||
var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; | |||
var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; | |||
_dbConnection.Execute(sql, message, _dbTransaction); | |||
} | |||
@@ -29,7 +29,7 @@ select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Succeeded'; | |||
select count(Id) from ""{0}"".""published"" where ""StatusName"" = N'Failed'; | |||
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Failed'; | |||
select count(Id) from ""{0}"".""published"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued'); | |||
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Processing';", | |||
select count(Id) from ""{0}"".""received"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');", | |||
_options.Schema); | |||
var statistics = UseConnection(connection => | |||
@@ -36,7 +36,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
{ | |||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||
var sql = $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; | |||
var sql = $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""Content""= @Content,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;"; | |||
_dbConnection.Execute(sql, message, _dbTransaction); | |||
} | |||
@@ -30,7 +30,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeede | |||
select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed'; | |||
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; | |||
select count(Id) from [{0}].Published with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued'); | |||
select count(Id) from [{0}].Received with (nolock) where StatusName = N'Processing';", | |||
select count(Id) from [{0}].Received with (nolock) where StatusName in (N'Processing',N'Scheduled',N'Enqueued');", | |||
_options.Schema); | |||
var statistics = UseConnection(connection => | |||
@@ -36,7 +36,7 @@ namespace DotNetCore.CAP.SqlServer | |||
{ | |||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||
var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; | |||
var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; | |||
_dbConnection.Execute(sql, message, _dbTransaction); | |||
} | |||
@@ -1,7 +1,9 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Diagnostics; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Abstractions; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Models; | |||
using DotNetCore.CAP.Processor; | |||
@@ -13,10 +15,10 @@ namespace DotNetCore.CAP | |||
public class SubscibeQueueExecutor : IQueueExecutor | |||
{ | |||
private readonly IConsumerInvokerFactory _consumerInvokerFactory; | |||
private readonly IStateChanger _stateChanger; | |||
private readonly ILogger _logger; | |||
private readonly CapOptions _options; | |||
private readonly MethodMatcherCache _selector; | |||
private readonly IStateChanger _stateChanger; | |||
public SubscibeQueueExecutor( | |||
IStateChanger stateChanger, | |||
@@ -41,16 +43,14 @@ namespace DotNetCore.CAP | |||
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); | |||
if (message.Retries > 0) | |||
{ | |||
_logger.JobRetrying(message.Retries); | |||
} | |||
var result = await ExecuteSubscribeAsync(message); | |||
sp.Stop(); | |||
IState newState; | |||
if (!result.Succeeded) | |||
{ | |||
var shouldRetry = await UpdateMessageForRetryAsync(message, connection); | |||
var shouldRetry = await UpdateMessageForRetryAsync(message, connection, result.Exception?.Message); | |||
if (shouldRetry) | |||
{ | |||
newState = new ScheduledState(); | |||
@@ -71,17 +71,10 @@ namespace DotNetCore.CAP | |||
fetched.RemoveFromQueue(); | |||
if (result.Succeeded) | |||
{ | |||
_logger.JobExecuted(sp.Elapsed.TotalSeconds); | |||
} | |||
return OperateResult.Success; | |||
} | |||
catch (SubscriberNotFoundException ex) | |||
{ | |||
_logger.LogError(ex.Message); | |||
return OperateResult.Failed(ex); | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); | |||
@@ -96,9 +89,7 @@ namespace DotNetCore.CAP | |||
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); | |||
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) | |||
{ | |||
throw new SubscriberNotFoundException(receivedMessage.Name + " has not been found."); | |||
} | |||
// If there are multiple consumers in the same group, we will take the first | |||
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; | |||
@@ -115,23 +106,31 @@ namespace DotNetCore.CAP | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", ex); | |||
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", | |||
ex); | |||
return OperateResult.Failed(ex); | |||
} | |||
} | |||
private async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection) | |||
private static async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, | |||
IStorageConnection connection, string exceptionMessage) | |||
{ | |||
var retryBehavior = RetryBehavior.DefaultRetry; | |||
var retries = ++message.Retries; | |||
if (retries >= retryBehavior.RetryCount) | |||
{ | |||
return false; | |||
} | |||
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); | |||
message.ExpiresAt = due; | |||
var exceptions = new List<KeyValuePair<string, string>> | |||
{ | |||
new KeyValuePair<string, string>("ExceptionMessage", exceptionMessage) | |||
}; | |||
message.Content = Helper.AddJsonProperty(message.Content, exceptions); | |||
using (var transaction = connection.CreateTransaction()) | |||
{ | |||
transaction.UpdateMessage(message); | |||
@@ -1,8 +1,10 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.ComponentModel; | |||
using System.Globalization; | |||
using System.Reflection; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
namespace DotNetCore.CAP.Infrastructure | |||
{ | |||
@@ -91,6 +93,16 @@ namespace DotNetCore.CAP.Infrastructure | |||
return !CanConvertFromString(type); | |||
} | |||
public static string AddJsonProperty(string json, IList<KeyValuePair<string, string>> properties) | |||
{ | |||
var jObj = JObject.Parse(json); | |||
foreach (var property in properties) | |||
{ | |||
jObj.Add(new JProperty(property.Key, property.Value)); | |||
} | |||
return jObj.ToString(); | |||
} | |||
private static bool CanConvertFromString(Type destinationType) | |||
{ | |||
destinationType = Nullable.GetUnderlyingType(destinationType) ?? destinationType; | |||