@@ -1,7 +1,7 @@ | |||
| |||
Microsoft Visual Studio Solution File, Format Version 12.00 | |||
# Visual Studio Version 16 | |||
VisualStudioVersion = 16.0.29025.244 | |||
# Visual Studio Version 17 | |||
VisualStudioVersion = 17.0.31919.166 | |||
MinimumVisualStudioVersion = 10.0.40219.1 | |||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}" | |||
EndProject | |||
@@ -26,7 +26,6 @@ | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/> | |||
<PackageReference Include="JetBrains.Annotations" Version="2021.3.0" PrivateAssets="All" /> | |||
</ItemGroup> | |||
</Project> |
@@ -209,7 +209,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
/// </summary> | |||
/// <param name="arn">Source ARN</param> | |||
/// <returns>Group prefix or null if group not present</returns> | |||
private static string GetArnGroupPrefix(string arn) | |||
private static string? GetArnGroupPrefix(string arn) | |||
{ | |||
const char separator = '-'; | |||
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) | |||
@@ -235,7 +235,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
/// </summary> | |||
/// <param name="arn">Source ARN</param> | |||
/// <returns>Group name or null if group not present</returns> | |||
private static string GetGroupName(string arn) | |||
private static string? GetGroupName(string arn) | |||
{ | |||
const char separator = ':'; | |||
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) | |||
@@ -27,8 +27,8 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private readonly string _groupId; | |||
private readonly AmazonSQSOptions _amazonSQSOptions; | |||
private IAmazonSimpleNotificationService _snsClient; | |||
private IAmazonSQS _sqsClient; | |||
private IAmazonSimpleNotificationService? _snsClient; | |||
private IAmazonSQS? _sqsClient; | |||
private string _queueUrl = string.Empty; | |||
public AmazonSQSConsumerClient(string groupId, IOptions<AmazonSQSOptions> options) | |||
@@ -37,9 +37,9 @@ namespace DotNetCore.CAP.AmazonSQS | |||
_amazonSQSOptions = options.Value; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl); | |||
@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws()); | |||
var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult(); | |||
var createTopicResponse = _snsClient!.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult(); | |||
topicArns.Add(createTopicResponse.TopicArn); | |||
} | |||
@@ -92,13 +92,13 @@ namespace DotNetCore.CAP.AmazonSQS | |||
while (true) | |||
{ | |||
var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult(); | |||
var response = _sqsClient!.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult(); | |||
if (response.Messages.Count == 1) | |||
{ | |||
var messageObj = JsonSerializer.Deserialize<SQSReceivedMessage>(response.Messages[0].Body); | |||
var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); | |||
var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); | |||
var body = messageObj.Message; | |||
var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); | |||
@@ -119,7 +119,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
try | |||
{ | |||
_ = _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender).GetAwaiter().GetResult(); | |||
_ = _sqsClient!.DeleteMessageAsync(_queueUrl, (string)sender).GetAwaiter().GetResult(); | |||
} | |||
catch (InvalidIdFormatException ex) | |||
{ | |||
@@ -127,12 +127,12 @@ namespace DotNetCore.CAP.AmazonSQS | |||
} | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
try | |||
{ | |||
// Visible again in 3 seconds | |||
_ = _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3).GetAwaiter().GetResult(); | |||
_ = _sqsClient!.ChangeMessageVisibilityAsync(_queueUrl, (string)sender!, 3).GetAwaiter().GetResult(); | |||
} | |||
catch (MessageNotInflightException ex) | |||
{ | |||
@@ -237,7 +237,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
Connect(initSNS: false, initSQS: true); | |||
var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var queueAttributes = await _sqsClient!.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var sqsQueueArn = queueAttributes["QueueArn"]; | |||
@@ -263,12 +263,12 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private async Task SubscribeToTopics(IEnumerable<string> topics) | |||
{ | |||
var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var queueAttributes = await _sqsClient!.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var sqsQueueArn = queueAttributes["QueueArn"]; | |||
foreach (var topicArn in topics) | |||
{ | |||
await _snsClient.SubscribeAsync(new SubscribeRequest | |||
await _snsClient!.SubscribeAsync(new SubscribeRequest | |||
{ | |||
TopicArn = topicArn, | |||
Protocol = "sqs", | |||
@@ -10,19 +10,19 @@ namespace DotNetCore.CAP | |||
// ReSharper disable once InconsistentNaming | |||
public class AmazonSQSOptions | |||
{ | |||
public RegionEndpoint Region { get; set; } | |||
public RegionEndpoint Region { get; set; } = default!; | |||
public AWSCredentials Credentials { get; set; } | |||
public AWSCredentials? Credentials { get; set; } | |||
/// <summary> | |||
/// Overrides Service Url deduced from AWS Region. To use in local development environments like localstack. | |||
/// </summary> | |||
public string SNSServiceUrl { get; set; } | |||
public string? SNSServiceUrl { get; set; } | |||
/// <summary> | |||
/// Overrides Service Url deduced from AWS Region. To use in local development environments like localstack. | |||
/// </summary> | |||
public string SQSServiceUrl { get; set; } | |||
public string? SQSServiceUrl { get; set; } | |||
} | |||
} |
@@ -2,12 +2,12 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.AmazonSQS</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);AmazonSQS;SQS</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.AmazonSQS.xml</DocumentationFile> | |||
<GenerateDocumentationFile>true</GenerateDocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
@@ -22,8 +22,8 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private readonly ILogger _logger; | |||
private readonly IOptions<AmazonSQSOptions> _sqsOptions; | |||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); | |||
private IAmazonSimpleNotificationService _snsClient; | |||
private IDictionary<string, string> _topicArnMaps; | |||
private IAmazonSimpleNotificationService? _snsClient; | |||
private IDictionary<string, string>? _topicArnMaps; | |||
public AmazonSQSTransport(ILogger<AmazonSQSTransport> logger, IOptions<AmazonSQSOptions> sqsOptions) | |||
{ | |||
@@ -41,7 +41,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
if (TryGetOrCreateTopicArn(message.GetName().NormalizeForAws(), out var arn)) | |||
{ | |||
string bodyJson = null; | |||
string? bodyJson = null; | |||
if (message.Body != null) | |||
{ | |||
bodyJson = Encoding.UTF8.GetString(message.Body); | |||
@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
MessageAttributes = attributes | |||
}; | |||
await _snsClient.PublishAsync(request); | |||
await _snsClient!.PublishAsync(request); | |||
_logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published."); | |||
return OperateResult.Success; | |||
@@ -117,7 +117,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
_topicArnMaps = new Dictionary<string, string>(); | |||
string nextToken = null; | |||
string? nextToken = null; | |||
do | |||
{ | |||
var topics = nextToken == null | |||
@@ -143,15 +143,15 @@ namespace DotNetCore.CAP.AmazonSQS | |||
} | |||
} | |||
private bool TryGetOrCreateTopicArn(string topicName, out string topicArn) | |||
private bool TryGetOrCreateTopicArn(string topicName,[System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out string? topicArn) | |||
{ | |||
topicArn = null; | |||
if (_topicArnMaps.TryGetValue(topicName, out topicArn)) | |||
if (_topicArnMaps!.TryGetValue(topicName, out topicArn)) | |||
{ | |||
return true; | |||
} | |||
var response = _snsClient.CreateTopicAsync(topicName).GetAwaiter().GetResult(); | |||
var response = _snsClient!.CreateTopicAsync(topicName).GetAwaiter().GetResult(); | |||
if (string.IsNullOrEmpty(response.TopicArn)) | |||
{ | |||
@@ -4,15 +4,15 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
class SQSReceivedMessage | |||
{ | |||
public string Message { get; set; } | |||
public string? Message { get; set; } | |||
public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; } | |||
public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; } = default!; | |||
} | |||
class SQSReceivedMessageAttributes | |||
{ | |||
public string Type { get; set; } | |||
public string? Type { get; set; } | |||
public string Value { get; set; } | |||
public string? Value { get; set; } | |||
} | |||
} |
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
private readonly string _subscriptionName; | |||
private readonly AzureServiceBusOptions _asbOptions; | |||
private SubscriptionClient _consumerClient; | |||
private SubscriptionClient? _consumerClient; | |||
public AzureServiceBusConsumerClient( | |||
ILogger logger, | |||
@@ -36,11 +36,11 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
_asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("AzureServiceBus", _asbOptions.ConnectionString); | |||
public BrokerAddress BrokerAddress => new ("AzureServiceBus", _asbOptions.ConnectionString); | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
ConnectAsync().GetAwaiter().GetResult(); | |||
var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name); | |||
var allRuleNames = _consumerClient!.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name); | |||
foreach (var newRule in topics.Except(allRuleNames)) | |||
{ | |||
@@ -80,7 +80,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
if (_asbOptions.EnableSessions) | |||
{ | |||
_consumerClient.RegisterSessionHandler(OnConsumerReceivedWithSession, | |||
_consumerClient!.RegisterSessionHandler(OnConsumerReceivedWithSession, | |||
new SessionHandlerOptions(OnExceptionReceived) | |||
{ | |||
AutoComplete = false, | |||
@@ -89,7 +89,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
} | |||
else | |||
{ | |||
_consumerClient.RegisterMessageHandler(OnConsumerReceived, | |||
_consumerClient!.RegisterMessageHandler(OnConsumerReceived, | |||
new MessageHandlerOptions(OnExceptionReceived) | |||
{ | |||
AutoComplete = false, | |||
@@ -111,15 +111,15 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
var commitInput = (AzureServiceBusConsumerCommitInput) sender; | |||
if (_asbOptions.EnableSessions) | |||
{ | |||
commitInput.Session.CompleteAsync(commitInput.LockToken); | |||
commitInput.Session?.CompleteAsync(commitInput.LockToken); | |||
} | |||
else | |||
{ | |||
_consumerClient.CompleteAsync(commitInput.LockToken); | |||
_consumerClient!.CompleteAsync(commitInput.LockToken); | |||
} | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
// ignore | |||
} | |||
@@ -4,13 +4,13 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
{ | |||
public class AzureServiceBusConsumerCommitInput | |||
{ | |||
public AzureServiceBusConsumerCommitInput(string lockToken, IMessageSession session = null) | |||
public AzureServiceBusConsumerCommitInput(string lockToken, IMessageSession? session = null) | |||
{ | |||
LockToken = lockToken; | |||
Session = session; | |||
} | |||
public IMessageSession Session { get; set; } | |||
public IMessageSession? Session { get; set; } | |||
public string LockToken { get; set; } | |||
} | |||
} |
@@ -20,7 +20,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Azure Service Bus Namespace connection string. Must not contain topic information. | |||
/// </summary> | |||
public string ConnectionString { get; set; } | |||
public string ConnectionString { get; set; } = default!; | |||
/// <summary> | |||
/// Whether Service Bus sessions are enabled. If enabled, all messages must contain a | |||
@@ -36,6 +36,6 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Represents the Azure Active Directory token provider for Azure Managed Service Identity integration. | |||
/// </summary> | |||
public ITokenProvider ManagementTokenProvider { get; set; } | |||
public ITokenProvider? ManagementTokenProvider { get; set; } | |||
} | |||
} |
@@ -2,7 +2,7 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.AzureServiceBus</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);AzureServiceBus</PackageTags> | |||
</PropertyGroup> | |||
@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
private readonly ILogger _logger; | |||
private readonly IOptions<AzureServiceBusOptions> _asbOptions; | |||
private ITopicClient _topicClient; | |||
private ITopicClient? _topicClient; | |||
public AzureServiceBusTransport( | |||
ILogger<AzureServiceBusTransport> logger, | |||
@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.AzureServiceBus | |||
message.UserProperties.Add(header.Key, header.Value); | |||
} | |||
await _topicClient.SendAsync(message); | |||
await _topicClient!.SendAsync(message); | |||
_logger.LogDebug($"Azure Service Bus message [{transportMessage.GetName()}] has been published."); | |||
@@ -2,7 +2,7 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.InMemoryStorage</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);InMemory</PackageTags> | |||
</PropertyGroup> | |||
@@ -26,9 +26,9 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
_serializer = serializer; | |||
} | |||
public static Dictionary<string, MemoryMessage> PublishedMessages { get; } = new Dictionary<string, MemoryMessage>(); | |||
public static Dictionary<string, MemoryMessage> PublishedMessages { get; } = new(); | |||
public static Dictionary<string, MemoryMessage> ReceivedMessages { get; } = new Dictionary<string, MemoryMessage>(); | |||
public static Dictionary<string, MemoryMessage> ReceivedMessages { get; } = new(); | |||
public Task ChangePublishStateAsync(MediumMessage message, StatusName state) | |||
{ | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
return Task.CompletedTask; | |||
} | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var message = new MediumMessage | |||
{ | |||
@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
{ | |||
DbId = id, | |||
Group = group, | |||
Origin = null, | |||
Origin = null!, | |||
Name = name, | |||
Content = content, | |||
Retries = _capOptions.Value.FailedRetryCount, | |||
@@ -186,7 +186,7 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
foreach (var message in result) | |||
{ | |||
message.Origin = _serializer.Deserialize(message.Content); | |||
message.Origin = _serializer.Deserialize(message.Content)!; | |||
} | |||
return Task.FromResult(result); | |||
@@ -208,7 +208,7 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
foreach (var message in result) | |||
{ | |||
message.Origin = _serializer.Deserialize(message.Content); | |||
message.Origin = _serializer.Deserialize(message.Content)!; | |||
} | |||
return Task.FromResult(result); | |||
@@ -15,14 +15,14 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
{ | |||
internal class InMemoryMonitoringApi : IMonitoringApi | |||
{ | |||
public Task<MediumMessage> GetPublishedMessageAsync(long id) | |||
public Task<MediumMessage?> GetPublishedMessageAsync(long id) | |||
{ | |||
return Task.FromResult((MediumMessage)InMemoryStorage.PublishedMessages.Values.FirstOrDefault(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); | |||
return Task.FromResult<MediumMessage?>(InMemoryStorage.PublishedMessages.Values.FirstOrDefault(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); | |||
} | |||
public Task<MediumMessage> GetReceivedMessageAsync(long id) | |||
public Task<MediumMessage?> GetReceivedMessageAsync(long id) | |||
{ | |||
return Task.FromResult((MediumMessage)InMemoryStorage.ReceivedMessages.Values.FirstOrDefault(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); | |||
return Task.FromResult<MediumMessage?>(InMemoryStorage.ReceivedMessages.Values.FirstOrDefault(x => x.DbId == id.ToString(CultureInfo.InvariantCulture))); | |||
} | |||
public StatisticsDto GetStatistics() | |||
@@ -8,10 +8,10 @@ namespace DotNetCore.CAP.InMemoryStorage | |||
{ | |||
internal class MemoryMessage : MediumMessage | |||
{ | |||
public string Name { get; set; } | |||
public string Name { get; set; } = default!; | |||
public StatusName StatusName { get; set; } | |||
public string Group { get; set; } | |||
public string Group { get; set; } = default!; | |||
} | |||
} |
@@ -37,11 +37,11 @@ namespace DotNetCore.CAP | |||
/// Initial list of brokers as a CSV list of broker host or host:port. | |||
/// </para> | |||
/// </summary> | |||
public string Servers { get; set; } | |||
public string Servers { get; set; } = default!; | |||
/// <summary> | |||
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/> | |||
/// </summary> | |||
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>> CustomHeaders { get; set; } | |||
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; } | |||
} | |||
} |
@@ -2,14 +2,13 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);Kafka</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors> | |||
<NoWarn>NU1701;CS1591</NoWarn> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Kafka.xml</DocumentationFile> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
@@ -43,8 +43,8 @@ namespace DotNetCore.CAP.Kafka | |||
var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]> | |||
{ | |||
Headers = headers, | |||
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), | |||
Value = message.Body | |||
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string? kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), | |||
Value = message.Body! | |||
}); | |||
if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) | |||
@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka | |||
private readonly string _groupId; | |||
private readonly KafkaOptions _kafkaOptions; | |||
private IConsumer<string, byte[]> _consumerClient; | |||
private IConsumer<string, byte[]>? _consumerClient; | |||
public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options) | |||
{ | |||
@@ -29,11 +29,11 @@ namespace DotNetCore.CAP.Kafka | |||
_kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("Kafka", _kafkaOptions.Servers); | |||
public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers); | |||
public ICollection<string> FetchTopics(IEnumerable<string> topicNames) | |||
{ | |||
@@ -80,7 +80,7 @@ namespace DotNetCore.CAP.Kafka | |||
Connect(); | |||
_consumerClient.Subscribe(topics); | |||
_consumerClient!.Subscribe(topics); | |||
} | |||
public void Listening(TimeSpan timeout, CancellationToken cancellationToken) | |||
@@ -89,11 +89,11 @@ namespace DotNetCore.CAP.Kafka | |||
while (true) | |||
{ | |||
var consumerResult = _consumerClient.Consume(cancellationToken); | |||
var consumerResult = _consumerClient!.Consume(cancellationToken); | |||
if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; | |||
var headers = new Dictionary<string, string>(consumerResult.Message.Headers.Count); | |||
var headers = new Dictionary<string, string?>(consumerResult.Message.Headers.Count); | |||
foreach (var header in consumerResult.Message.Headers) | |||
{ | |||
var val = header.GetValueBytes(); | |||
@@ -106,7 +106,7 @@ namespace DotNetCore.CAP.Kafka | |||
var customHeaders = _kafkaOptions.CustomHeaders(consumerResult); | |||
foreach (var customHeader in customHeaders) | |||
{ | |||
headers.Add(customHeader.Key, customHeader.Value); | |||
headers[customHeader.Key] = customHeader.Value; | |||
} | |||
} | |||
@@ -119,12 +119,12 @@ namespace DotNetCore.CAP.Kafka | |||
public void Commit(object sender) | |||
{ | |||
_consumerClient.Commit((ConsumeResult<string, byte[]>)sender); | |||
_consumerClient!.Commit((ConsumeResult<string, byte[]>)sender); | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
_consumerClient.Assign(_consumerClient.Assignment); | |||
_consumerClient!.Assign(_consumerClient.Assignment); | |||
} | |||
public void Dispose() | |||
@@ -177,6 +177,6 @@ namespace DotNetCore.CAP.Kafka | |||
Reason = $"An error occurred during connect kafka --> {e.Reason}" | |||
}; | |||
OnLog?.Invoke(null, logArgs); | |||
} | |||
} | |||
} | |||
} |
@@ -32,7 +32,7 @@ namespace DotNetCore.CAP.MongoDB | |||
//Try to add IMongoClient if does not exists | |||
services.TryAddSingleton<IMongoClient>(x => | |||
{ | |||
var options = x.GetService<IOptions<MongoDBOptions>>().Value; | |||
var options = x.GetRequiredService<IOptions<MongoDBOptions>>().Value; | |||
return new MongoClient(options.DatabaseConnection); | |||
}); | |||
} | |||
@@ -30,6 +30,6 @@ namespace DotNetCore.CAP.MongoDB | |||
/// </summary> | |||
public string PublishedCollection { get; set; } = "cap.published"; | |||
internal string Version { get; set; } | |||
internal string Version { get; set; } = default!; | |||
} | |||
} |
@@ -2,15 +2,10 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.MongoDB</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);MongoDB</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.MongoDB.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
@@ -19,7 +19,7 @@ namespace MongoDB.Driver | |||
public CapMongoDbClientSessionHandle(ICapTransaction transaction) | |||
{ | |||
_transaction = transaction; | |||
_sessionHandle = (IClientSessionHandle)_transaction.DbTransaction; | |||
_sessionHandle = (IClientSessionHandle)_transaction.DbTransaction!; | |||
} | |||
public void Dispose() | |||
@@ -59,7 +59,7 @@ namespace MongoDB.Driver | |||
return Task.CompletedTask; | |||
} | |||
public void StartTransaction(TransactionOptions transactionOptions = null) | |||
public void StartTransaction(TransactionOptions? transactionOptions = null) | |||
{ | |||
_sessionHandle.StartTransaction(transactionOptions); | |||
} | |||
@@ -78,12 +78,12 @@ namespace MongoDB.Driver | |||
return _sessionHandle.Fork(); | |||
} | |||
public TResult WithTransaction<TResult>(Func<IClientSessionHandle, CancellationToken, TResult> callback, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default) | |||
public TResult WithTransaction<TResult>(Func<IClientSessionHandle, CancellationToken, TResult> callback, TransactionOptions? transactionOptions = null, CancellationToken cancellationToken = default) | |||
{ | |||
return _sessionHandle.WithTransaction(callback, transactionOptions, cancellationToken); | |||
} | |||
public Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default) | |||
public Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions? transactionOptions = null, CancellationToken cancellationToken = default) | |||
{ | |||
return _sessionHandle.WithTransactionAsync(callbackAsync, transactionOptions, cancellationToken); | |||
} | |||
@@ -63,7 +63,7 @@ namespace DotNetCore.CAP.MongoDB | |||
await collection.UpdateOneAsync(x => x.Id == long.Parse(message.DbId), updateDef); | |||
} | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var insertOptions = new InsertOneOptions { BypassDocumentValidation = false }; | |||
@@ -188,7 +188,7 @@ namespace DotNetCore.CAP.MongoDB | |||
return queryResult.Select(x => new MediumMessage | |||
{ | |||
DbId = x.Id.ToString(), | |||
Origin = _serializer.Deserialize(x.Content), | |||
Origin = _serializer.Deserialize(x.Content)!, | |||
Retries = x.Retries, | |||
Added = x.Added | |||
}).ToList(); | |||
@@ -209,7 +209,7 @@ namespace DotNetCore.CAP.MongoDB | |||
return queryResult.Select(x => new MediumMessage | |||
{ | |||
DbId = x.Id.ToString(), | |||
Origin = _serializer.Deserialize(x.Content), | |||
Origin = _serializer.Deserialize(x.Content)!, | |||
Retries = x.Retries, | |||
Added = x.Added | |||
}).ToList(); | |||
@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.MongoDB | |||
_database = mongoClient.GetDatabase(_options.DatabaseName); | |||
} | |||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) | |||
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) | |||
{ | |||
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection); | |||
var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); | |||
@@ -41,7 +41,7 @@ namespace DotNetCore.CAP.MongoDB | |||
}; | |||
} | |||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) | |||
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) | |||
{ | |||
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection); | |||
var message = await collection.Find(x => x.Id == id).FirstOrDefaultAsync(); | |||
@@ -9,13 +9,13 @@ namespace DotNetCore.CAP.MongoDB | |||
{ | |||
public long Id { get; set; } | |||
public string Version { get; set; } | |||
public string Version { get; set; } = default!; | |||
public string Group { get; set; } | |||
public string Group { get; set; } = default!; | |||
public string Name { get; set; } | |||
public string Name { get; set; } = default!; | |||
public string Content { get; set; } | |||
public string Content { get; set; } = default!; | |||
public DateTime Added { get; set; } | |||
@@ -23,18 +23,18 @@ namespace DotNetCore.CAP.MongoDB | |||
public int Retries { get; set; } | |||
public string StatusName { get; set; } | |||
public string StatusName { get; set; } = default!; | |||
} | |||
internal class PublishedMessage | |||
{ | |||
public long Id { get; set; } | |||
public string Version { get; set; } | |||
public string Version { get; set; } = default!; | |||
public string Name { get; set; } | |||
public string Name { get; set; } = default!; | |||
public string Content { get; set; } | |||
public string Content { get; set; } = default!; | |||
public DateTime Added { get; set; } | |||
@@ -42,6 +42,6 @@ namespace DotNetCore.CAP.MongoDB | |||
public int Retries { get; set; } | |||
public string StatusName { get; set; } | |||
public string StatusName { get; set; } = default!; | |||
} | |||
} |
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// EF db context type. | |||
/// </summary> | |||
internal Type DbContextType { get; set; } | |||
internal Type? DbContextType { get; set; } | |||
/// <summary> | |||
/// Data version | |||
@@ -13,7 +13,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Gets or sets the database's connection string that will be used to store database entities. | |||
/// </summary> | |||
public string ConnectionString { get; set; } | |||
public string ConnectionString { get; set; } = default!; | |||
} | |||
internal class ConfigureMySqlOptions : IConfigureOptions<MySqlOptions> | |||
@@ -2,15 +2,9 @@ | |||
<PropertyGroup> | |||
<TargetFrameworks>net6.0;netstandard2.1</TargetFrameworks> | |||
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);MySQL</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.MySql.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' "> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.0" /> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql | |||
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) => | |||
await ChangeMessageStateAsync(_recName, message, state); | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var sql = $"INSERT INTO `{_pubName}`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)" + | |||
$" VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||
@@ -68,7 +68,7 @@ namespace DotNetCore.CAP.MySql | |||
new MySqlParameter("@Content", message.Content), | |||
new MySqlParameter("@Retries", message.Retries), | |||
new MySqlParameter("@Added", message.Added), | |||
new MySqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value), | |||
new MySqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? message.ExpiresAt.Value : DBNull.Value), | |||
new MySqlParameter("@StatusName", nameof(StatusName.Scheduled)), | |||
}; | |||
@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.MySql | |||
dbTrans = dbContextTrans.GetDbTransaction(); | |||
} | |||
var conn = dbTrans?.Connection; | |||
var conn = dbTrans!.Connection!; | |||
conn.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
} | |||
@@ -128,7 +128,7 @@ namespace DotNetCore.CAP.MySql | |||
new MySqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)), | |||
new MySqlParameter("@Retries", mdMessage.Retries), | |||
new MySqlParameter("@Added", mdMessage.Added), | |||
new MySqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new MySqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new MySqlParameter("@StatusName", nameof(StatusName.Scheduled)) | |||
}; | |||
@@ -198,7 +198,7 @@ namespace DotNetCore.CAP.MySql | |||
messages.Add(new MediumMessage | |||
{ | |||
DbId = reader.GetInt64(0).ToString(), | |||
Origin = _serializer.Deserialize(reader.GetString(1)), | |||
Origin = _serializer.Deserialize(reader.GetString(1))!, | |||
Retries = reader.GetInt32(2), | |||
Added = reader.GetDateTime(3) | |||
}); | |||
@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.MySql | |||
{ | |||
internal static class DbConnectionExtensions | |||
{ | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction? transaction = null, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.MySql | |||
return command.ExecuteNonQuery(); | |||
} | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc, | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T>? readerFunc, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.MySql | |||
var reader = command.ExecuteReader(); | |||
T result = default; | |||
T result = default!; | |||
if (readerFunc != null) | |||
{ | |||
result = readerFunc(reader); | |||
@@ -77,14 +77,14 @@ namespace DotNetCore.CAP.MySql | |||
var objValue = command.ExecuteScalar(); | |||
T result = default; | |||
T result = default!; | |||
if (objValue != null) | |||
{ | |||
var returnType = typeof(T); | |||
var converter = TypeDescriptor.GetConverter(returnType); | |||
if (converter.CanConvertFrom(objValue.GetType())) | |||
{ | |||
result = (T)converter.ConvertFrom(objValue); | |||
result = (T)converter.ConvertFrom(objValue)!; | |||
} | |||
else | |||
{ | |||
@@ -19,7 +19,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
public CapEFDbTransaction(ICapTransaction transaction) | |||
{ | |||
_transaction = transaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
TransactionId = dbContextTransaction.TransactionId; | |||
} | |||
@@ -60,7 +60,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
{ | |||
get | |||
{ | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
return dbContextTransaction.GetDbTransaction(); | |||
} | |||
} | |||
@@ -247,18 +247,18 @@ WHERE `Key` >= @minKey | |||
return result; | |||
} | |||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
private async Task<MediumMessage> GetMessageAsync(string tableName, long id) | |||
private async Task<MediumMessage?> GetMessageAsync(string tableName, long id) | |||
{ | |||
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; | |||
await using var connection = new MySqlConnection(_options.ConnectionString); | |||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||
{ | |||
MediumMessage message = null; | |||
MediumMessage? message = null; | |||
while (reader.Read()) | |||
{ | |||
@@ -27,9 +27,9 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Used to setup all NATs client options | |||
/// </summary> | |||
public Options Options { get; set; } | |||
public Options? Options { get; set; } | |||
public Action<StreamConfiguration.StreamConfigurationBuilder> StreamOptions { get; set; } | |||
public Action<StreamConfiguration.StreamConfigurationBuilder>? StreamOptions { get; set; } | |||
public Func<string, string> NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; | |||
} |
@@ -3,7 +3,6 @@ | |||
using System; | |||
using DotNetCore.CAP; | |||
using JetBrains.Annotations; | |||
// ReSharper disable once CheckNamespace | |||
namespace Microsoft.Extensions.DependencyInjection | |||
@@ -15,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection | |||
/// </summary> | |||
/// <param name="options">CAP configuration options</param> | |||
/// <param name="bootstrapServers">NATS bootstrap server urls.</param> | |||
public static CapOptions UseNATS(this CapOptions options, [CanBeNull] string bootstrapServers = null) | |||
public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null) | |||
{ | |||
return options.UseNATS(opt => | |||
{ | |||
@@ -2,16 +2,10 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.NATS</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);NATS</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors> | |||
<NoWarn>NU1701;CS1591</NoWarn> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.NATS.xml</DocumentationFile> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="NATS.Client" Version="0.14.1" /> | |||
</ItemGroup> | |||
@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.NATS | |||
private readonly string _groupId; | |||
private readonly NATSOptions _natsOptions; | |||
private IConnection _consumerClient; | |||
private IConnection? _consumerClient; | |||
public NATSConsumerClient(string groupId, IOptions<NATSOptions> options) | |||
{ | |||
@@ -29,15 +29,17 @@ namespace DotNetCore.CAP.NATS | |||
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers); | |||
public BrokerAddress BrokerAddress => new ("NATS", _natsOptions.Servers); | |||
public ICollection<string> FetchTopics(IEnumerable<string> topicNames) | |||
{ | |||
var jsm = _consumerClient.CreateJetStreamManagementContext(); | |||
Connect(); | |||
var jsm = _consumerClient!.CreateJetStreamManagementContext(); | |||
var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); | |||
@@ -80,7 +82,7 @@ namespace DotNetCore.CAP.NATS | |||
Connect(); | |||
var js = _consumerClient.CreateJetStreamContext(); | |||
var js = _consumerClient!.CreateJetStreamContext(); | |||
foreach (var subject in topics) | |||
{ | |||
@@ -106,7 +108,7 @@ namespace DotNetCore.CAP.NATS | |||
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) | |||
{ | |||
var headers = new Dictionary<string, string>(); | |||
var headers = new Dictionary<string, string?>(); | |||
foreach (string h in e.Message.Header.Keys) | |||
{ | |||
@@ -126,7 +128,7 @@ namespace DotNetCore.CAP.NATS | |||
} | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
if (sender is Msg msg) | |||
{ | |||
@@ -153,7 +155,7 @@ namespace DotNetCore.CAP.NATS | |||
if (_consumerClient == null) | |||
{ | |||
var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); | |||
opts.Url = _natsOptions.Servers ?? opts.Url; | |||
opts.Url ??= _natsOptions.Servers; | |||
opts.ClosedEventHandler = ConnectedEventHandler; | |||
opts.DisconnectedEventHandler = ConnectedEventHandler; | |||
opts.AsyncErrorEventHandler = AsyncErrorEventHandler; | |||
@@ -16,11 +16,11 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public string Schema { get; set; } = DefaultSchema; | |||
internal Type DbContextType { get; set; } | |||
internal Type? DbContextType { get; set; } | |||
/// <summary> | |||
/// Data version | |||
/// </summary> | |||
internal string Version { get; set; } | |||
internal string Version { get; set; } = default!; | |||
} | |||
} |
@@ -13,7 +13,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Gets or sets the database's connection string that will be used to store database entities. | |||
/// </summary> | |||
public string ConnectionString { get; set; } | |||
public string ConnectionString { get; set; } = default!; | |||
} | |||
internal class ConfigurePostgreSqlOptions : IConfigureOptions<PostgreSqlOptions> | |||
@@ -2,15 +2,10 @@ | |||
<PropertyGroup> | |||
<TargetFrameworks>net6.0;netstandard2.1</TargetFrameworks> | |||
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);PostgreSQL</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.PostgreSql.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' "> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.0" /> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.0" /> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) => | |||
await ChangeMessageStateAsync(_recName, message, state); | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var sql = | |||
$"INSERT INTO {_pubName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + | |||
@@ -69,7 +69,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
new NpgsqlParameter("@Content", message.Content), | |||
new NpgsqlParameter("@Retries", message.Retries), | |||
new NpgsqlParameter("@Added", message.Added), | |||
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value), | |||
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? message.ExpiresAt.Value : DBNull.Value), | |||
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled)) | |||
}; | |||
@@ -84,7 +84,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) | |||
dbTrans = dbContextTrans.GetDbTransaction(); | |||
var conn = dbTrans?.Connection; | |||
var conn = dbTrans?.Connection!; | |||
conn.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
} | |||
@@ -127,7 +127,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
new NpgsqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)), | |||
new NpgsqlParameter("@Retries", mdMessage.Retries), | |||
new NpgsqlParameter("@Added", mdMessage.Added), | |||
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled)) | |||
}; | |||
@@ -199,7 +199,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
messages.Add(new MediumMessage | |||
{ | |||
DbId = reader.GetInt64(0).ToString(), | |||
Origin = _serializer.Deserialize(reader.GetString(1)), | |||
Origin = _serializer.Deserialize(reader.GetString(1))!, | |||
Retries = reader.GetInt32(2), | |||
Added = reader.GetDateTime(3) | |||
}); | |||
@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
{ | |||
internal static class DbConnectionExtensions | |||
{ | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction? transaction = null, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
return command.ExecuteNonQuery(); | |||
} | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc, | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T>? readerFunc, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
var reader = command.ExecuteReader(); | |||
T result = default; | |||
T result = default!; | |||
if (readerFunc != null) | |||
{ | |||
result = readerFunc(reader); | |||
@@ -77,14 +77,14 @@ namespace DotNetCore.CAP.PostgreSql | |||
var objValue = command.ExecuteScalar(); | |||
T result = default; | |||
T result = default!; | |||
if (objValue != null) | |||
{ | |||
var returnType = typeof(T); | |||
var converter = TypeDescriptor.GetConverter(returnType); | |||
if (converter.CanConvertFrom(objValue.GetType())) | |||
{ | |||
result = (T)converter.ConvertFrom(objValue); | |||
result = (T)converter.ConvertFrom(objValue)!; | |||
} | |||
else | |||
{ | |||
@@ -18,7 +18,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
public CapEFDbTransaction(ICapTransaction transaction) | |||
{ | |||
_transaction = transaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
TransactionId = dbContextTransaction.TransactionId; | |||
} | |||
@@ -58,7 +58,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
{ | |||
get | |||
{ | |||
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction!; | |||
return dbContextTransaction.GetDbTransaction(); | |||
} | |||
} | |||
@@ -27,9 +27,9 @@ namespace DotNetCore.CAP.PostgreSql | |||
_recName = initializer.GetReceivedTableName(); | |||
} | |||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
public StatisticsDto GetStatistics() | |||
{ | |||
@@ -120,7 +120,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
Content = reader.GetString(index++), | |||
Retries = reader.GetInt32(index++), | |||
Added = reader.GetDateTime(index++), | |||
ExpiresAt = reader.IsDBNull(index++) ? (DateTime?)null : reader.GetDateTime(index - 1), | |||
ExpiresAt = reader.IsDBNull(index++) ? null : reader.GetDateTime(index - 1), | |||
StatusName = reader.GetString(index) | |||
}); | |||
} | |||
@@ -242,14 +242,14 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK | |||
return result; | |||
} | |||
private async Task<MediumMessage> GetMessageAsync(string tableName, long id) | |||
private async Task<MediumMessage?> GetMessageAsync(string tableName, long id) | |||
{ | |||
var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; | |||
await using var connection = new NpgsqlConnection(_options.ConnectionString); | |||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||
{ | |||
MediumMessage message = null; | |||
MediumMessage? message = null; | |||
while (reader.Read()) | |||
{ | |||
@@ -11,9 +11,11 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public class PulsarOptions | |||
{ | |||
public string ServiceUrl { get; set; } | |||
public string ServiceUrl { get; set; } = default!; | |||
public TlsOptions TlsOptions { get; set; } | |||
public bool EnableClientLog { get; set; } = false; | |||
public TlsOptions? TlsOptions { get; set; } | |||
} | |||
} | |||
@@ -2,14 +2,9 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.Pulsar</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);Pulsar</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors> | |||
<NoWarn>NU1701;CS1591</NoWarn> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.Pulsar.xml</DocumentationFile> | |||
<NoWarn>CS0067</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
@@ -13,12 +13,14 @@ namespace DotNetCore.CAP.Pulsar | |||
{ | |||
public class ConnectionFactory : IConnectionFactory, IAsyncDisposable | |||
{ | |||
private PulsarClient _client; | |||
private readonly ILogger<ConnectionFactory> _logger; | |||
private PulsarClient? _client; | |||
private readonly PulsarOptions _options; | |||
private readonly ConcurrentDictionary<string, Task<IProducer<byte[]>>> _topicProducers; | |||
public ConnectionFactory(ILogger<ConnectionFactory> logger, IOptions<PulsarOptions> options) | |||
{ | |||
_logger = logger; | |||
_options = options.Value; | |||
_topicProducers = new ConcurrentDictionary<string, Task<IProducer<byte[]>>>(); | |||
@@ -22,7 +22,7 @@ namespace DotNetCore.CAP.Pulsar | |||
_connectionFactory = connectionFactory; | |||
} | |||
public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _connectionFactory.ServersAddress); | |||
public BrokerAddress BrokerAddress => new ("Pulsar", _connectionFactory.ServersAddress); | |||
public async Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
@@ -30,9 +30,9 @@ namespace DotNetCore.CAP.Pulsar | |||
try | |||
{ | |||
var headerDic = new Dictionary<string, string>(message.Headers); | |||
var headerDic = new Dictionary<string, string?>(message.Headers); | |||
headerDic.TryGetValue(PulsarHeaders.PulsarKey, out var key); | |||
var pulsarMessage = producer.NewMessage(message.Body, key, headerDic); | |||
var pulsarMessage = producer.NewMessage(message.Body!, key, headerDic); | |||
var result = await producer.SendAsync(pulsarMessage); | |||
if (result.Type != null) | |||
@@ -7,6 +7,8 @@ using System.Reflection; | |||
using System.Threading; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Logging.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
using Pulsar.Client.Api; | |||
using Pulsar.Client.Common; | |||
@@ -15,23 +17,23 @@ namespace DotNetCore.CAP.Pulsar | |||
{ | |||
internal sealed class PulsarConsumerClient : IConsumerClient | |||
{ | |||
private static PulsarClient _client; | |||
private readonly PulsarClient _client; | |||
private readonly string _groupId; | |||
private readonly PulsarOptions _pulsarOptions; | |||
private IConsumer<byte[]> _consumerClient; | |||
private IConsumer<byte[]>? _consumerClient; | |||
public PulsarConsumerClient(PulsarClient client,string groupId, IOptions<PulsarOptions> options) | |||
public PulsarConsumerClient(PulsarClient client, string groupId, IOptions<PulsarOptions> options) | |||
{ | |||
_client = client; | |||
_client = client; | |||
_groupId = groupId; | |||
_pulsarOptions = options.Value; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _pulsarOptions.ServiceUrl); | |||
public BrokerAddress BrokerAddress => new ("Pulsar", _pulsarOptions.ServiceUrl); | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
@@ -41,22 +43,22 @@ namespace DotNetCore.CAP.Pulsar | |||
} | |||
var serviceName = Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); | |||
_consumerClient = _client.NewConsumer() | |||
.Topics(topics) | |||
.SubscriptionName(_groupId) | |||
.ConsumerName(serviceName) | |||
.SubscriptionType(SubscriptionType.Shared) | |||
.SubscribeAsync().Result; | |||
.SubscribeAsync().GetAwaiter().GetResult(); | |||
} | |||
public void Listening(TimeSpan timeout, CancellationToken cancellationToken) | |||
{ | |||
while (!cancellationToken.IsCancellationRequested) | |||
{ | |||
var consumerResult = _consumerClient.ReceiveAsync().Result; | |||
var consumerResult = _consumerClient!.ReceiveAsync(cancellationToken).Result; | |||
var headers = new Dictionary<string, string>(consumerResult.Properties.Count); | |||
var headers = new Dictionary<string, string?>(consumerResult.Properties.Count); | |||
foreach (var header in consumerResult.Properties) | |||
{ | |||
headers.Add(header.Key, header.Value); | |||
@@ -72,12 +74,15 @@ namespace DotNetCore.CAP.Pulsar | |||
public void Commit(object sender) | |||
{ | |||
_consumerClient.AcknowledgeAsync((MessageId)sender); | |||
_consumerClient!.AcknowledgeAsync((MessageId)sender); | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
_consumerClient.NegativeAcknowledge((MessageId)sender); | |||
if(sender is MessageId id) | |||
{ | |||
_consumerClient!.NegativeAcknowledge(id); | |||
} | |||
} | |||
public void Dispose() | |||
@@ -85,14 +90,6 @@ namespace DotNetCore.CAP.Pulsar | |||
_consumerClient?.DisposeAsync(); | |||
} | |||
private void ConsumerClient_OnConsumeError(IConsumer<byte[]> consumer, Exception e) | |||
{ | |||
var logArgs = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.ServerConnError, | |||
Reason = $"An error occurred during connect pulsar --> {e.Message}" | |||
}; | |||
OnLog?.Invoke(null, logArgs); | |||
} | |||
} | |||
} | |||
} |
@@ -2,7 +2,9 @@ | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using Pulsar.Client.Api; | |||
namespace DotNetCore.CAP.Pulsar | |||
{ | |||
@@ -11,10 +13,15 @@ namespace DotNetCore.CAP.Pulsar | |||
private readonly IConnectionFactory _connection; | |||
private readonly IOptions<PulsarOptions> _pulsarOptions; | |||
public PulsarConsumerClientFactory(IConnectionFactory connection, IOptions<PulsarOptions> pulsarOptions) | |||
public PulsarConsumerClientFactory(IConnectionFactory connection, ILoggerFactory loggerFactory, IOptions<PulsarOptions> pulsarOptions) | |||
{ | |||
_connection = connection; | |||
_pulsarOptions = pulsarOptions; | |||
if (_pulsarOptions.Value.EnableClientLog) | |||
{ | |||
PulsarClient.Logger = loggerFactory.CreateLogger<PulsarClient>(); | |||
} | |||
} | |||
public IConsumerClient Create(string groupId) | |||
@@ -22,7 +29,7 @@ namespace DotNetCore.CAP.Pulsar | |||
try | |||
{ | |||
var client = _connection.RentClient(); | |||
var consumerClient = new PulsarConsumerClient(client,groupId, _pulsarOptions); | |||
var consumerClient = new PulsarConsumerClient(client, groupId, _pulsarOptions); | |||
return consumerClient; | |||
} | |||
catch (System.Exception e) | |||
@@ -74,28 +74,29 @@ namespace DotNetCore.CAP | |||
/// Optional queue arguments, also known as "x-arguments" because of their field name in the AMQP 0-9-1 protocol, | |||
/// is a map (dictionary) of arbitrary key/value pairs that can be provided by clients when a queue is declared. | |||
/// </summary> | |||
public QueueArgumentsOptions QueueArguments { get; set; } = new QueueArgumentsOptions(); | |||
public QueueArgumentsOptions QueueArguments { get; set; } = new (); | |||
/// <summary> | |||
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader"/>. | |||
/// </summary> | |||
public Func<BasicDeliverEventArgs, List<KeyValuePair<string, string>>> CustomHeaders { get; set; } | |||
public Func<BasicDeliverEventArgs, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; } | |||
/// <summary> | |||
/// RabbitMQ native connection factory options | |||
/// </summary> | |||
public Action<ConnectionFactory> ConnectionFactoryOptions { get; set; } | |||
public Action<ConnectionFactory>? ConnectionFactoryOptions { get; set; } | |||
public class QueueArgumentsOptions | |||
{ | |||
/// <summary> | |||
/// Gets or sets queue mode by supplying the 'x-queue-mode' declaration argument with a string specifying the desired mode. | |||
/// </summary> | |||
public string QueueMode { get; set; } | |||
public string QueueMode { get; set; } = default!; | |||
/// <summary> | |||
/// Gets or sets queue message automatic deletion time (in milliseconds) "x-message-ttl", Default 864000000 ms (10 days). | |||
/// </summary> | |||
// ReSharper disable once InconsistentNaming | |||
public int MessageTTL { get; set; } = 864000000; | |||
} | |||
} |
@@ -2,14 +2,9 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);RabbitMQ</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.RabbitMQ.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" /> | |||
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private readonly Func<IConnection> _connectionActivator; | |||
private readonly ILogger<ConnectionChannelPool> _logger; | |||
private readonly ConcurrentQueue<IModel> _pool; | |||
private IConnection _connection; | |||
private IConnection? _connection; | |||
private static readonly object SLock = new object(); | |||
private int _count; | |||
@@ -27,11 +27,11 @@ namespace DotNetCore.CAP.RabbitMQ | |||
_exchange = _connectionChannelPool.Exchange; | |||
} | |||
public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _connectionChannelPool.HostAddress); | |||
public BrokerAddress BrokerAddress => new ("RabbitMQ", _connectionChannelPool.HostAddress); | |||
public Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
IModel channel = null; | |||
IModel? channel = null; | |||
try | |||
{ | |||
channel = _connectionChannelPool.Rent(); | |||
@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
var props = channel.CreateBasicProperties(); | |||
props.DeliveryMode = 2; | |||
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value); | |||
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object?)x.Value); | |||
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); | |||
@@ -22,9 +22,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private readonly string _exchangeName; | |||
private readonly string _queueName; | |||
private readonly RabbitMQOptions _rabbitMQOptions; | |||
private IModel _channel; | |||
private IConnection _connection; | |||
private IModel? _channel; | |||
private IConnection? _connection; | |||
public RabbitMQConsumerClient(string queueName, | |||
IConnectionChannelPool connectionChannelPool, | |||
@@ -36,11 +35,11 @@ namespace DotNetCore.CAP.RabbitMQ | |||
_exchangeName = connectionChannelPool.Exchange; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", _rabbitMQOptions.HostName); | |||
public BrokerAddress BrokerAddress => new("RabbitMQ", _rabbitMQOptions.HostName); | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
@@ -81,17 +80,17 @@ namespace DotNetCore.CAP.RabbitMQ | |||
public void Commit(object sender) | |||
{ | |||
if (_channel.IsOpen) | |||
if (_channel!.IsOpen) | |||
{ | |||
_channel.BasicAck((ulong)sender, false); | |||
} | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
if (_channel.IsOpen) | |||
if (_channel!.IsOpen && sender is ulong val) | |||
{ | |||
_channel.BasicReject((ulong)sender, true); | |||
_channel.BasicReject(val, true); | |||
} | |||
} | |||
@@ -175,7 +174,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | |||
{ | |||
var headers = new Dictionary<string, string>(); | |||
var headers = new Dictionary<string, string?>(); | |||
if (e.BasicProperties.Headers != null) | |||
{ | |||
@@ -15,12 +15,22 @@ namespace Microsoft.Extensions.DependencyInjection | |||
return options.UseRedis(_ => { }); | |||
} | |||
/// <summary> | |||
/// Use redis streams as the message transport. | |||
/// </summary> | |||
/// <param name="options">The <see cref="CapOptions"/>.</param> | |||
/// <param name="connection">The StackExchange.Redis <see cref="ConfigurationOptions"/> comma-delimited configuration string.</param> | |||
public static CapOptions UseRedis(this CapOptions options, string connection) | |||
{ | |||
return options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection)); | |||
} | |||
/// <summary> | |||
/// Use redis streams as the message transport. | |||
/// </summary> | |||
/// <param name="options">The <see cref="CapOptions"/>.</param> | |||
/// <param name="configure">The CAP redis client options.</param> | |||
/// <exception cref="ArgumentNullException"><paramref name="configure"/> is <c>null</c>.</exception> | |||
public static CapOptions UseRedis(this CapOptions options, Action<CapRedisOptions> configure) | |||
{ | |||
if (configure is null) throw new ArgumentNullException(nameof(configure)); | |||
@@ -9,11 +9,11 @@ namespace DotNetCore.CAP | |||
public class CapRedisOptions | |||
{ | |||
/// <summary> | |||
/// Gets or sets the options of redis connections | |||
/// Gets or sets the native options of StackExchange.Redis | |||
/// </summary> | |||
public ConfigurationOptions Configuration { get; set; } | |||
public ConfigurationOptions? Configuration { get; set; } | |||
internal string Endpoint { get; set; } | |||
internal string Endpoint { get; set; } = default!; | |||
/// <summary> | |||
/// Gets or sets the count of entries consumed from stream | |||
@@ -2,7 +2,7 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.RedisStreams</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);RedisStreams</PackageTags> | |||
</PropertyGroup> | |||
@@ -16,7 +16,7 @@ | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -14,11 +14,10 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
internal class RedisConnectionPool : IRedisConnectionPool, IDisposable | |||
{ | |||
private readonly ConcurrentBag<AsyncLazyRedisConnection> _connections = | |||
new ConcurrentBag<AsyncLazyRedisConnection>(); | |||
private readonly ConcurrentBag<AsyncLazyRedisConnection> _connections = new(); | |||
private readonly ILoggerFactory _loggerFactory; | |||
private readonly SemaphoreSlim _poolLock = new SemaphoreSlim(1); | |||
private readonly SemaphoreSlim _poolLock = new(1); | |||
private readonly CapRedisOptions _redisOptions; | |||
private bool _isDisposed; | |||
private bool _poolAlreadyConfigured; | |||
@@ -30,13 +29,11 @@ namespace DotNetCore.CAP.RedisStreams | |||
Init().GetAwaiter().GetResult(); | |||
} | |||
private AsyncLazyRedisConnection QuietConnection | |||
private AsyncLazyRedisConnection? QuietConnection | |||
{ | |||
get | |||
{ | |||
if (_poolAlreadyConfigured) | |||
return _connections.OrderBy(async c => (await c).ConnectionCapacity).First(); | |||
return null; | |||
return _poolAlreadyConfigured ? _connections.OrderBy(async c => (await c).ConnectionCapacity).First() : null; | |||
} | |||
} | |||
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
var redisLogger = new RedisLogger(logger); | |||
ConnectionMultiplexer connection = null; | |||
ConnectionMultiplexer? connection = null; | |||
while (attemp <= 5) | |||
{ | |||
@@ -18,9 +18,9 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
private readonly string _groupId; | |||
private readonly ILogger<RedisConsumerClient> _logger; | |||
private readonly IOptions<CapRedisOptions> _options; | |||
private readonly IOptions<CapRedisOptions> _options; | |||
private readonly IRedisStreamManager _redis; | |||
private string[] _topics; | |||
private string[] _topics = default!; | |||
public RedisConsumerClient(string groupId, | |||
IRedisStreamManager redis, | |||
@@ -34,11 +34,11 @@ namespace DotNetCore.CAP.RedisStreams | |||
_logger = logger; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("redis", _options.Value.Endpoint); | |||
public BrokerAddress BrokerAddress => new("redis", _options.Value.Endpoint); | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
@@ -59,16 +59,17 @@ namespace DotNetCore.CAP.RedisStreams | |||
cancellationToken.ThrowIfCancellationRequested(); | |||
cancellationToken.WaitHandle.WaitOne(timeout); | |||
} | |||
// ReSharper disable once FunctionNeverReturns | |||
} | |||
public void Commit(object sender) | |||
{ | |||
var (stream, group, id) = ((string stream, string group, string id)) sender; | |||
var (stream, group, id) = ((string stream, string group, string id))sender; | |||
_redis.Ack(stream, group, id).GetAwaiter().GetResult(); | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
// ignore | |||
} | |||
@@ -94,32 +95,39 @@ namespace DotNetCore.CAP.RedisStreams | |||
private async Task ConsumeMessages(IAsyncEnumerable<IEnumerable<RedisStream>> streamsSet, RedisValue position) | |||
{ | |||
await foreach (var set in streamsSet) | |||
foreach (var stream in set) | |||
foreach (var entry in stream.Entries) | |||
{ | |||
if (entry.IsNull) | |||
return; | |||
try | |||
foreach (var stream in set) | |||
{ | |||
var message = RedisMessage.Create(entry, _groupId); | |||
OnMessageReceived?.Invoke((stream.Key.ToString(), _groupId, entry.Id.ToString()), message); | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError(ex.Message, ex); | |||
var logArgs = new LogMessageEventArgs | |||
foreach (var entry in stream.Entries) | |||
{ | |||
LogType = MqLogType.ConsumeError, | |||
Reason = ex.ToString() | |||
}; | |||
OnLog?.Invoke(entry, logArgs); | |||
} | |||
finally | |||
{ | |||
var positionName = position == StreamPosition.Beginning | |||
? nameof(StreamPosition.Beginning) | |||
: nameof(StreamPosition.NewMessages); | |||
_logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); | |||
if (entry.IsNull) | |||
{ | |||
return; | |||
} | |||
try | |||
{ | |||
var message = RedisMessage.Create(entry, _groupId); | |||
OnMessageReceived?.Invoke((stream.Key.ToString(), _groupId, entry.Id.ToString()), message); | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError(ex.Message, ex); | |||
var logArgs = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.ConsumeError, | |||
Reason = ex.ToString() | |||
}; | |||
OnLog?.Invoke(entry, logArgs); | |||
} | |||
finally | |||
{ | |||
var positionName = position == StreamPosition.Beginning | |||
? nameof(StreamPosition.Beginning) | |||
: nameof(StreamPosition.NewMessages); | |||
_logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
private readonly IRedisConnectionPool _connectionsPool; | |||
private readonly ILogger<RedisStreamManager> _logger; | |||
private readonly CapRedisOptions _options; | |||
private IConnectionMultiplexer _redis; | |||
private IConnectionMultiplexer? _redis; | |||
public RedisStreamManager(IRedisConnectionPool connectionsPool, IOptions<CapRedisOptions> options, | |||
ILogger<RedisStreamManager> logger) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored | |||
var database = _redis.GetDatabase(); | |||
var database = _redis!.GetDatabase(); | |||
var streamExist = await database.KeyExistsAsync(stream); | |||
if (!streamExist) | |||
{ | |||
@@ -53,7 +53,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
//The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored | |||
await _redis.GetDatabase().StreamAddAsync(stream, message); | |||
await _redis!.GetDatabase().StreamAddAsync(stream, message); | |||
} | |||
public async IAsyncEnumerable<IEnumerable<RedisStream>> PollStreamsLatestMessagesAsync(string[] streams, | |||
@@ -98,7 +98,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
await ConnectAsync(); | |||
await _redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); | |||
await _redis!.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); | |||
} | |||
private async Task<IEnumerable<RedisStream>> TryReadConsumerGroup(string consumerGroup, | |||
@@ -112,7 +112,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
await ConnectAsync(); | |||
var database = _redis.GetDatabase(); | |||
var database = _redis!.GetDatabase(); | |||
await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, _logger) | |||
.WithCancellation(token)) | |||
@@ -11,7 +11,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
{ | |||
internal static class RedisStreamManagerExtensions | |||
{ | |||
public static async IAsyncEnumerable<StreamPosition> TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger = null) | |||
public static async IAsyncEnumerable<StreamPosition> TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger) | |||
{ | |||
foreach (var position in positions) | |||
{ | |||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, | |||
StreamPosition.NewMessages)) | |||
{ | |||
logger.LogInformation( | |||
logger!.LogInformation( | |||
$"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); | |||
created = true; | |||
} | |||
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, | |||
StreamPosition.NewMessages)) | |||
{ | |||
logger.LogInformation( | |||
logger!.LogInformation( | |||
$"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); | |||
created = true; | |||
} | |||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.RedisStreams | |||
_logger = logger; | |||
} | |||
public BrokerAddress BrokerAddress => new BrokerAddress("redis", _options.Endpoint); | |||
public BrokerAddress BrokerAddress => new ("redis", _options.Endpoint); | |||
public async Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
@@ -23,16 +23,15 @@ namespace DotNetCore.CAP.RedisStreams | |||
}; | |||
} | |||
public static TransportMessage Create(StreamEntry streamEntry, string groupId = null) | |||
public static TransportMessage Create(StreamEntry streamEntry, string? groupId = null) | |||
{ | |||
if (streamEntry.IsNull) | |||
return null; | |||
var headersRaw = streamEntry[Headers]; | |||
if (headersRaw.IsNullOrEmpty) | |||
{ | |||
throw new ArgumentException($"Redis stream entry with id {streamEntry.Id} missing cap headers"); | |||
var headers = JsonSerializer.Deserialize<IDictionary<string, string>>(headersRaw); | |||
} | |||
var headers = JsonSerializer.Deserialize<IDictionary<string, string?>>(headersRaw)!; | |||
var bodyRaw = streamEntry[Body]; | |||
@@ -43,8 +42,12 @@ namespace DotNetCore.CAP.RedisStreams | |||
return new TransportMessage(headers, body); | |||
} | |||
private static string ToJson(object obj) | |||
private static RedisValue ToJson(object? obj) | |||
{ | |||
if (obj == null) | |||
{ | |||
return RedisValue.Null; | |||
} | |||
return JsonSerializer.Serialize(obj, new JsonSerializerOptions(JsonSerializerDefaults.Web)); | |||
} | |||
} |
@@ -19,14 +19,14 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// EF DbContext | |||
/// </summary> | |||
internal Type DbContextType { get; set; } | |||
internal Type? DbContextType { get; set; } | |||
internal bool IsSqlServer2008 { get; set; } | |||
/// <summary> | |||
/// Data version | |||
/// </summary> | |||
internal string Version { get; set; } | |||
internal string Version { get; set; } = default!; | |||
public EFOptions UseSqlServer2008() | |||
{ | |||
@@ -13,7 +13,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Gets or sets the database's connection string that will be used to store database entities. | |||
/// </summary> | |||
public string ConnectionString { get; set; } | |||
public string ConnectionString { get; set; } = default!; | |||
} | |||
@@ -4,6 +4,7 @@ | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Diagnostics.CodeAnalysis; | |||
using System.Reflection; | |||
using DotNetCore.CAP.Persistence; | |||
using DotNetCore.CAP.Transport; | |||
@@ -11,7 +12,7 @@ using Microsoft.Data.SqlClient; | |||
namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
{ | |||
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>> | |||
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object?>> | |||
{ | |||
public const string SqlAfterCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitAfter"; | |||
public const string SqlErrorCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitError"; | |||
@@ -36,41 +37,48 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
{ | |||
} | |||
public void OnNext(KeyValuePair<string, object> evt) | |||
public void OnNext(KeyValuePair<string, object?> evt) | |||
{ | |||
if (evt.Key == SqlAfterCommitTransactionMicrosoft) | |||
switch (evt.Key) | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (_bufferList.TryRemove(transactionKey, out var msgList)) | |||
case SqlAfterCommitTransactionMicrosoft: | |||
{ | |||
foreach (var message in msgList) | |||
if (!TryGetSqlConnection(evt, out SqlConnection? sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (_bufferList.TryRemove(transactionKey, out var msgList)) | |||
{ | |||
_dispatcher.EnqueueToPublish(message); | |||
foreach (var message in msgList) | |||
{ | |||
_dispatcher.EnqueueToPublish(message); | |||
} | |||
} | |||
break; | |||
} | |||
} | |||
else if (evt.Key == SqlErrorCommitTransactionMicrosoft || evt.Key == SqlAfterRollbackTransactionMicrosoft || evt.Key == SqlBeforeCloseConnectionMicrosoft) | |||
{ | |||
if (!_bufferList.IsEmpty) | |||
case SqlErrorCommitTransactionMicrosoft or SqlAfterRollbackTransactionMicrosoft or SqlBeforeCloseConnectionMicrosoft: | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (!_bufferList.IsEmpty) | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection? sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
_bufferList.TryRemove(transactionKey, out _); | |||
} | |||
_bufferList.TryRemove(transactionKey, out _); | |||
break; | |||
} | |||
} | |||
} | |||
private static bool TryGetSqlConnection(KeyValuePair<string, object> evt, out SqlConnection sqlConnection) | |||
private static bool TryGetSqlConnection(KeyValuePair<string, object?> evt, [NotNullWhen(true)] out SqlConnection? sqlConnection) | |||
{ | |||
sqlConnection = GetProperty(evt.Value, "Connection") as SqlConnection; | |||
return sqlConnection != null; | |||
} | |||
private static object GetProperty(object _this, string propertyName) | |||
private static object? GetProperty(object? @this, string propertyName) | |||
{ | |||
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this); | |||
return @this?.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(@this); | |||
} | |||
} | |||
} |
@@ -34,7 +34,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
public void OnNext(DiagnosticListener listener) | |||
{ | |||
if (listener.Name == DiagnosticListenerName) | |||
{ | |||
listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList)); | |||
} | |||
} | |||
} | |||
} |
@@ -2,14 +2,8 @@ | |||
<PropertyGroup> | |||
<TargetFrameworks>net6.0;netstandard2.1</TargetFrameworks> | |||
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);SQL Server</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.SqlServer.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' "> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP | |||
if (dbTransaction == null) throw new ArgumentNullException(nameof(DbTransaction)); | |||
} | |||
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId; | |||
var transactionKey = ((SqlConnection)dbTransaction.Connection!).ClientConnectionId; | |||
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) | |||
{ | |||
list.Add(msg); | |||
@@ -166,7 +166,7 @@ namespace DotNetCore.CAP | |||
var dbTransaction = dbConnection.BeginTransaction(); | |||
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<SqlServerCapTransaction>(publisher.ServiceProvider); | |||
var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit); | |||
return (IDbTransaction)capTransaction.DbTransaction; | |||
return (IDbTransaction)capTransaction.DbTransaction!; | |||
} | |||
/// <summary> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.SqlServer | |||
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) => | |||
await ChangeMessageStateAsync(_recName, message, state); | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var sql = $"INSERT INTO {_pubName} ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + | |||
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||
@@ -84,7 +84,7 @@ namespace DotNetCore.CAP.SqlServer | |||
dbTrans = dbContextTrans.GetDbTransaction(); | |||
var conn = dbTrans?.Connection; | |||
conn.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
conn!.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
} | |||
return message; | |||
@@ -126,7 +126,7 @@ namespace DotNetCore.CAP.SqlServer | |||
new SqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)), | |||
new SqlParameter("@Retries", mdMessage.Retries), | |||
new SqlParameter("@Added", mdMessage.Added), | |||
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new SqlParameter("@StatusName", nameof(StatusName.Scheduled)) | |||
}; | |||
@@ -200,7 +200,7 @@ namespace DotNetCore.CAP.SqlServer | |||
messages.Add(new MediumMessage | |||
{ | |||
DbId = reader.GetInt64(0).ToString(), | |||
Origin = _serializer.Deserialize(reader.GetString(1)), | |||
Origin = _serializer.Deserialize(reader.GetString(1))!, | |||
Retries = reader.GetInt32(2), | |||
Added = reader.GetDateTime(3) | |||
}); | |||
@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.SqlServer | |||
{ | |||
internal static class DbConnectionExtensions | |||
{ | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction? transaction = null, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.SqlServer | |||
return command.ExecuteNonQuery(); | |||
} | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc, | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T>? readerFunc, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.SqlServer | |||
var reader = command.ExecuteReader(); | |||
T result = default; | |||
T result = default!; | |||
if (readerFunc != null) | |||
{ | |||
result = readerFunc(reader); | |||
@@ -77,14 +77,14 @@ namespace DotNetCore.CAP.SqlServer | |||
var objValue = command.ExecuteScalar(); | |||
T result = default; | |||
T result = default!; | |||
if (objValue != null) | |||
{ | |||
var returnType = typeof(T); | |||
var converter = TypeDescriptor.GetConverter(returnType); | |||
if (converter.CanConvertFrom(objValue.GetType())) | |||
{ | |||
result = (T)converter.ConvertFrom(objValue); | |||
result = (T)converter.ConvertFrom(objValue)!; | |||
} | |||
else | |||
{ | |||
@@ -18,7 +18,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
public CapEFDbTransaction(ICapTransaction transaction) | |||
{ | |||
_transaction = transaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
TransactionId = dbContextTransaction.TransactionId; | |||
} | |||
@@ -58,7 +58,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
{ | |||
get | |||
{ | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
return dbContextTransaction.GetDbTransaction(); | |||
} | |||
} | |||
@@ -131,7 +131,7 @@ SELECT | |||
Content = reader.GetString(index++), | |||
Retries = reader.GetInt32(index++), | |||
Added = reader.GetDateTime(index++), | |||
ExpiresAt = reader.IsDBNull(index++) ? (DateTime?)null : reader.GetDateTime(index - 1), | |||
ExpiresAt = reader.IsDBNull(index++) ? null : reader.GetDateTime(index - 1), | |||
StatusName = reader.GetString(index) | |||
}); | |||
} | |||
@@ -162,9 +162,9 @@ SELECT | |||
return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded)); | |||
} | |||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
private int GetNumberOfMessage(string tableName, string statusName) | |||
{ | |||
@@ -254,14 +254,14 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < | |||
return result; | |||
} | |||
private async Task<MediumMessage> GetMessageAsync(string tableName, long id) | |||
private async Task<MediumMessage?> GetMessageAsync(string tableName, long id) | |||
{ | |||
var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; | |||
await using var connection = new SqlConnection(_options.ConnectionString); | |||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||
{ | |||
MediumMessage message = null; | |||
MediumMessage? message = null; | |||
while (reader.Read()) | |||
{ | |||
@@ -29,9 +29,9 @@ namespace DotNetCore.CAP | |||
} | |||
public class CapHeader : ReadOnlyDictionary<string, string> | |||
public class CapHeader : ReadOnlyDictionary<string, string?> | |||
{ | |||
public CapHeader(IDictionary<string, string> dictionary) : base(dictionary) | |||
public CapHeader(IDictionary<string, string?> dictionary) : base(dictionary) | |||
{ | |||
} | |||
@@ -6,7 +6,6 @@ using System.Linq; | |||
using System.Reflection; | |||
using DotNetCore.CAP.Filter; | |||
using DotNetCore.CAP.Internal; | |||
using JetBrains.Annotations; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
// ReSharper disable UnusedMember.Global | |||
@@ -78,7 +77,7 @@ namespace DotNetCore.CAP | |||
/// Registers subscribers from the specified types. | |||
/// </summary> | |||
/// <param name="handlerAssemblyMarkerTypes"></param> | |||
public CapBuilder AddSubscriberAssembly([NotNull] params Type[] handlerAssemblyMarkerTypes) | |||
public CapBuilder AddSubscriberAssembly(params Type[] handlerAssemblyMarkerTypes) | |||
{ | |||
if (handlerAssemblyMarkerTypes == null) throw new ArgumentNullException(nameof(handlerAssemblyMarkerTypes)); | |||
@@ -40,12 +40,12 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Subscriber group prefix. | |||
/// </summary> | |||
public string GroupNamePrefix { get; set; } | |||
public string? GroupNamePrefix { get; set; } | |||
/// <summary> | |||
/// Topic prefix. | |||
/// </summary> | |||
public string TopicNamePrefix { get; set; } | |||
public string? TopicNamePrefix { get; set; } | |||
/// <summary> | |||
/// The default version of the message, configured to isolate data in the same instance. The length must not exceed 20 | |||
@@ -67,7 +67,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times. | |||
/// </summary> | |||
public Action<FailedInfo> FailedThresholdCallback { get; set; } | |||
public Action<FailedInfo>? FailedThresholdCallback { get; set; } | |||
/// <summary> | |||
/// The number of message retries, the retry will stop when the threshold is reached. | |||
@@ -116,6 +116,6 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Configure JSON serialization settings | |||
/// </summary> | |||
public JsonSerializerOptions JsonSerializerOptions { get; } = new JsonSerializerOptions(); | |||
public JsonSerializerOptions JsonSerializerOptions { get; } = new (); | |||
} | |||
} |
@@ -8,27 +8,27 @@ namespace DotNetCore.CAP.Diagnostics | |||
{ | |||
public long? OperationTimestamp { get; set; } | |||
public string Operation { get; set; } | |||
public string Operation { get; set; } = default!; | |||
public Message Message { get; set; } | |||
public Message Message { get; set; } = default!; | |||
public long? ElapsedTimeMs { get; set; } | |||
public Exception Exception { get; set; } | |||
public Exception? Exception { get; set; } | |||
} | |||
public class CapEventDataPubSend | |||
{ | |||
public long? OperationTimestamp { get; set; } | |||
public string Operation { get; set; } | |||
public string Operation { get; set; } = default!; | |||
public TransportMessage TransportMessage { get; set; } | |||
public TransportMessage TransportMessage { get; set; } = default!; | |||
public BrokerAddress BrokerAddress { get; set; } | |||
public long? ElapsedTimeMs { get; set; } | |||
public Exception Exception { get; set; } | |||
public Exception? Exception { get; set; } | |||
} | |||
} |
@@ -5,7 +5,6 @@ using System; | |||
using System.Reflection; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using JetBrains.Annotations; | |||
namespace DotNetCore.CAP.Diagnostics | |||
{ | |||
@@ -13,30 +12,29 @@ namespace DotNetCore.CAP.Diagnostics | |||
{ | |||
public long? OperationTimestamp { get; set; } | |||
public string Operation { get; set; } | |||
public string Operation { get; set; } = default!; | |||
public TransportMessage TransportMessage { get; set; } | |||
public TransportMessage TransportMessage { get; set; } = default!; | |||
public BrokerAddress BrokerAddress { get; set; } | |||
public long? ElapsedTimeMs { get; set; } | |||
public Exception Exception { get; set; } | |||
public Exception? Exception { get; set; } | |||
} | |||
public class CapEventDataSubExecute | |||
{ | |||
public long? OperationTimestamp { get; set; } | |||
public string Operation { get; set; } | |||
public string Operation { get; set; } = default!; | |||
public Message Message { get; set; } | |||
public Message Message { get; set; } = default!; | |||
[CanBeNull] | |||
public MethodInfo MethodInfo { get; set; } | |||
public MethodInfo? MethodInfo { get; set; } | |||
public long? ElapsedTimeMs { get; set; } | |||
public Exception Exception { get; set; } | |||
public Exception? Exception { get; set; } | |||
} | |||
} |
@@ -2,6 +2,7 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<Nullable>enable</Nullable> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
@@ -5,7 +5,6 @@ using System; | |||
using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using JetBrains.Annotations; | |||
namespace DotNetCore.CAP | |||
{ | |||
@@ -28,7 +27,7 @@ namespace DotNetCore.CAP | |||
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param> | |||
/// <param name="callbackName">callback subscriber name</param> | |||
/// <param name="cancellationToken"></param> | |||
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, string callbackName = null, CancellationToken cancellationToken = default); | |||
Task PublishAsync<T>(string name, T? contentObj, string? callbackName = null, CancellationToken cancellationToken = default); | |||
/// <summary> | |||
/// Asynchronous publish an object message with custom headers | |||
@@ -38,7 +37,7 @@ namespace DotNetCore.CAP | |||
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param> | |||
/// <param name="headers">message additional headers.</param> | |||
/// <param name="cancellationToken"></param> | |||
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers, CancellationToken cancellationToken = default); | |||
Task PublishAsync<T>(string name, T? contentObj, IDictionary<string, string?> headers, CancellationToken cancellationToken = default); | |||
/// <summary> | |||
/// Publish an object message. | |||
@@ -46,7 +45,7 @@ namespace DotNetCore.CAP | |||
/// <param name="name">the topic name or exchange router key.</param> | |||
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param> | |||
/// <param name="callbackName">callback subscriber name</param> | |||
void Publish<T>(string name, [CanBeNull] T contentObj, string callbackName = null); | |||
void Publish<T>(string name, T? contentObj, string? callbackName = null); | |||
/// <summary> | |||
/// Publish an object message. | |||
@@ -54,6 +53,6 @@ namespace DotNetCore.CAP | |||
/// <param name="name">the topic name or exchange router key.</param> | |||
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param> | |||
/// <param name="headers">message additional headers.</param> | |||
void Publish<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers); | |||
void Publish<T>(string name, T? contentObj, IDictionary<string, string?> headers); | |||
} | |||
} |
@@ -23,7 +23,7 @@ namespace DotNetCore.CAP | |||
public bool AutoCommit { get; set; } | |||
public object DbTransaction { get; set; } | |||
public object? DbTransaction { get; set; } | |||
protected internal virtual void AddToSent(MediumMessage msg) | |||
{ | |||
@@ -20,7 +20,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Database transaction object, can be converted to a specific database transaction object or IDBTransaction when used | |||
/// </summary> | |||
object DbTransaction { get; set; } | |||
object? DbTransaction { get; set; } | |||
/// <summary> | |||
/// Submit the transaction context of the CAP, we will send the message to the message queue at the time of submission | |||
@@ -5,17 +5,17 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
public class ConsumerExecutedResult | |||
{ | |||
public ConsumerExecutedResult(object result, string msgId, string callbackName) | |||
public ConsumerExecutedResult(object? result, string msgId, string? callbackName) | |||
{ | |||
Result = result; | |||
MessageId = msgId; | |||
CallbackName = callbackName; | |||
} | |||
public object Result { get; set; } | |||
public object? Result { get; set; } | |||
public string MessageId { get; set; } | |||
public string CallbackName { get; set; } | |||
public string? CallbackName { get; set; } | |||
} | |||
} |
@@ -12,21 +12,21 @@ namespace DotNetCore.CAP.Internal | |||
/// </summary> | |||
public class ConsumerExecutorDescriptor | |||
{ | |||
public TypeInfo ServiceTypeInfo { get; set; } | |||
public TypeInfo? ServiceTypeInfo { get; set; } | |||
public MethodInfo MethodInfo { get; set; } | |||
public MethodInfo MethodInfo { get; set; } = default!; | |||
public TypeInfo ImplTypeInfo { get; set; } | |||
public TypeInfo ImplTypeInfo { get; set; } = default!; | |||
public TopicAttribute Attribute { get; set; } | |||
public TopicAttribute Attribute { get; set; } = default!; | |||
public TopicAttribute ClassAttribute { get; set; } | |||
public TopicAttribute? ClassAttribute { get; set; } | |||
public IList<ParameterDescriptor> Parameters { get; set; } | |||
public IList<ParameterDescriptor> Parameters { get; set; } = new List<ParameterDescriptor>(); | |||
public string TopicNamePrefix { get; set; } | |||
public string? TopicNamePrefix { get; set; } | |||
private string _topicName; | |||
private string? _topicName; | |||
/// <summary> | |||
/// Topic name based on both <see cref="Attribute"/> and <see cref="ClassAttribute"/>. | |||
/// </summary> | |||
@@ -58,7 +58,7 @@ namespace DotNetCore.CAP.Internal | |||
public class ConsumerExecutorDescriptorComparer : IEqualityComparer<ConsumerExecutorDescriptor> | |||
{ | |||
public bool Equals(ConsumerExecutorDescriptor x, ConsumerExecutorDescriptor y) | |||
public bool Equals(ConsumerExecutorDescriptor? x, ConsumerExecutorDescriptor? y) | |||
{ | |||
//Check whether the compared objects reference the same data. | |||
if (ReferenceEquals(x, y)) | |||
@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.Internal | |||
x.Attribute.Group.Equals(y.Attribute.Group, StringComparison.OrdinalIgnoreCase); | |||
} | |||
public int GetHashCode(ConsumerExecutorDescriptor obj) | |||
public int GetHashCode(ConsumerExecutorDescriptor? obj) | |||
{ | |||
//Check whether the object is null | |||
if (obj is null) return 0; | |||
@@ -95,9 +95,9 @@ namespace DotNetCore.CAP.Internal | |||
public class ParameterDescriptor | |||
{ | |||
public string Name { get; set; } | |||
public string Name { get; set; } = default!; | |||
public Type ParameterType { get; set; } | |||
public Type ParameterType { get; set; } = default!; | |||
public bool IsFromCap { get; set; } | |||
} |
@@ -19,6 +19,6 @@ namespace DotNetCore.CAP.Filter | |||
public bool ExceptionHandled { get; set; } | |||
public object Result { get; set; } | |||
public object? Result { get; set; } | |||
} | |||
} |
@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.Filter | |||
{ | |||
public class ExecutedContext : FilterContext | |||
{ | |||
public ExecutedContext(ConsumerContext context, object result) : base(context) | |||
public ExecutedContext(ConsumerContext context, object? result) : base(context) | |||
{ | |||
Result = result; | |||
} | |||
public object Result { get; set; } | |||
public object? Result { get; set; } | |||
} | |||
} |
@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.Filter | |||
{ | |||
public class ExecutingContext : FilterContext | |||
{ | |||
public ExecutingContext(ConsumerContext context, object[] arguments) : base(context) | |||
public ExecutingContext(ConsumerContext context, object?[] arguments) : base(context) | |||
{ | |||
Arguments = arguments; | |||
} | |||
public object[] Arguments { get; set; } | |||
public object?[] Arguments { get; set; } | |||
} | |||
} |
@@ -10,15 +10,6 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
public static class Helper | |||
{ | |||
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Local) | |||
.AddHours(TimeZoneInfo.Local.BaseUtcOffset.Hours); | |||
public static long ToTimestamp(DateTime value) | |||
{ | |||
var elapsedTime = value - Epoch; | |||
return (long)elapsedTime.TotalSeconds; | |||
} | |||
public static bool IsController(TypeInfo typeInfo) | |||
{ | |||
if (!typeInfo.IsClass) | |||
@@ -19,8 +19,9 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly ILogger<Bootstrapper> _logger; | |||
private IEnumerable<IProcessingServer> _processors; | |||
private CancellationTokenSource _cts = new CancellationTokenSource(); | |||
private readonly CancellationTokenSource _cts = new (); | |||
private bool _disposed; | |||
private IEnumerable<IProcessingServer> _processors = default!; | |||
public Bootstrapper(IServiceProvider serviceProvider, ILogger<Bootstrapper> logger) | |||
{ | |||
@@ -93,9 +94,13 @@ namespace DotNetCore.CAP.Internal | |||
public override void Dispose() | |||
{ | |||
_cts?.Cancel(); | |||
_cts?.Dispose(); | |||
_cts = null; | |||
if (_disposed) | |||
{ | |||
return; | |||
} | |||
_cts.Cancel(); | |||
_cts.Dispose(); | |||
_disposed = true; | |||
} | |||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |||
@@ -105,7 +110,7 @@ namespace DotNetCore.CAP.Internal | |||
public override async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
_cts?.Cancel(); | |||
_cts.Cancel(); | |||
await base.StopAsync(cancellationToken); | |||
} | |||
@@ -22,8 +22,7 @@ namespace DotNetCore.CAP.Internal | |||
private readonly CapOptions _capOptions; | |||
// ReSharper disable once InconsistentNaming | |||
protected static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
protected static readonly DiagnosticListener s_diagnosticListener = new(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
public CapPublisher(IServiceProvider service) | |||
{ | |||
@@ -38,20 +37,20 @@ namespace DotNetCore.CAP.Internal | |||
public AsyncLocal<ICapTransaction> Transaction { get; } | |||
public Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default) | |||
public Task PublishAsync<T>(string name, T? value, IDictionary<string, string?> headers, CancellationToken cancellationToken = default) | |||
{ | |||
return Task.Run(() => Publish(name, value, headers), cancellationToken); | |||
} | |||
public Task PublishAsync<T>(string name, T value, string callbackName = null, | |||
public Task PublishAsync<T>(string name, T? value, string? callbackName = null, | |||
CancellationToken cancellationToken = default) | |||
{ | |||
return Task.Run(() => Publish(name, value, callbackName), cancellationToken); | |||
} | |||
public void Publish<T>(string name, T value, string callbackName = null) | |||
public void Publish<T>(string name, T? value, string? callbackName = null) | |||
{ | |||
var header = new Dictionary<string, string> | |||
var header = new Dictionary<string, string?> | |||
{ | |||
{Headers.CallbackName, callbackName} | |||
}; | |||
@@ -59,7 +58,7 @@ namespace DotNetCore.CAP.Internal | |||
Publish(name, value, header); | |||
} | |||
public void Publish<T>(string name, T value, IDictionary<string, string> headers) | |||
public void Publish<T>(string name, T? value, IDictionary<string, string?> headers) | |||
{ | |||
if (string.IsNullOrEmpty(name)) | |||
{ | |||
@@ -71,14 +70,12 @@ namespace DotNetCore.CAP.Internal | |||
name = $"{_capOptions.TopicNamePrefix}.{name}"; | |||
} | |||
headers ??= new Dictionary<string, string>(); | |||
if (!headers.ContainsKey(Headers.MessageId)) | |||
{ | |||
var messageId = SnowflakeId.Default().NextId().ToString(); | |||
headers.Add(Headers.MessageId, messageId); | |||
} | |||
if (!headers.ContainsKey(Headers.CorrelationId)) | |||
{ | |||
headers.Add(Headers.CorrelationId, headers[Headers.MessageId]); | |||
@@ -25,15 +25,15 @@ namespace DotNetCore.CAP.Internal | |||
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); | |||
private readonly CapOptions _options; | |||
private IConsumerClientFactory _consumerClientFactory; | |||
private IDispatcher _dispatcher; | |||
private ISerializer _serializer; | |||
private IDataStorage _storage; | |||
private IConsumerClientFactory _consumerClientFactory = default!; | |||
private IDispatcher _dispatcher = default!; | |||
private ISerializer _serializer = default!; | |||
private IDataStorage _storage = default!; | |||
private MethodMatcherCache _selector; | |||
private CancellationTokenSource _cts; | |||
private MethodMatcherCache _selector = default!; | |||
private CancellationTokenSource _cts = new(); | |||
private BrokerAddress _serverAddress; | |||
private Task _compositeTask; | |||
private Task? _compositeTask; | |||
private bool _disposed; | |||
private bool _isHealthy = true; | |||
@@ -47,7 +47,6 @@ namespace DotNetCore.CAP.Internal | |||
_logger = logger; | |||
_serviceProvider = serviceProvider; | |||
_options = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value; | |||
_cts = new CancellationTokenSource(); | |||
} | |||
public bool IsHealthy() | |||
@@ -57,13 +56,13 @@ namespace DotNetCore.CAP.Internal | |||
public void Start(CancellationToken stoppingToken) | |||
{ | |||
_selector = _serviceProvider.GetService<MethodMatcherCache>(); | |||
_dispatcher = _serviceProvider.GetService<IDispatcher>(); | |||
_serializer = _serviceProvider.GetService<ISerializer>(); | |||
_storage = _serviceProvider.GetService<IDataStorage>(); | |||
_consumerClientFactory = _serviceProvider.GetService<IConsumerClientFactory>(); | |||
_selector = _serviceProvider.GetRequiredService<MethodMatcherCache>(); | |||
_dispatcher = _serviceProvider.GetRequiredService<IDispatcher>(); | |||
_serializer = _serviceProvider.GetRequiredService<ISerializer>(); | |||
_storage = _serviceProvider.GetRequiredService<IDataStorage>(); | |||
_consumerClientFactory = _serviceProvider.GetRequiredService<IConsumerClientFactory>(); | |||
stoppingToken.Register(() => _cts?.Cancel()); | |||
stoppingToken.Register(Dispose); | |||
Execute(); | |||
} | |||
@@ -131,7 +130,7 @@ namespace DotNetCore.CAP.Internal | |||
if (!IsHealthy() || force) | |||
{ | |||
Pulse(); | |||
_cts = new CancellationTokenSource(); | |||
_isHealthy = true; | |||
@@ -166,9 +165,8 @@ namespace DotNetCore.CAP.Internal | |||
public void Pulse() | |||
{ | |||
_cts?.Cancel(); | |||
_cts?.Dispose(); | |||
_cts = null; | |||
_cts.Cancel(); | |||
_cts.Dispose(); | |||
} | |||
private void RegisterMessageProcessor(IConsumerClient client) | |||
@@ -184,7 +182,7 @@ namespace DotNetCore.CAP.Internal | |||
tracingTimestamp = TracingBefore(transportMessage, _serverAddress); | |||
var name = transportMessage.GetName(); | |||
var group = transportMessage.GetGroup(); | |||
var group = transportMessage.GetGroup()!; | |||
Message message; | |||
@@ -201,21 +199,36 @@ namespace DotNetCore.CAP.Internal | |||
throw ex; | |||
} | |||
var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; | |||
var type = executor!.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; | |||
message = _serializer.DeserializeAsync(transportMessage, type).GetAwaiter().GetResult(); | |||
message.RemoveException(); | |||
} | |||
catch (Exception e) | |||
{ | |||
transportMessage.Headers[Headers.Exception] = e.GetType().Name + "-->" + e.Message; | |||
string? dataUri; | |||
if (transportMessage.Headers.TryGetValue(Headers.Type, out var val)) | |||
{ | |||
var dataUri = $"data:{val};base64," + Convert.ToBase64String(transportMessage.Body); | |||
if (transportMessage.Body != null) | |||
{ | |||
dataUri = $"data:{val};base64," + Convert.ToBase64String(transportMessage.Body); | |||
} | |||
else | |||
{ | |||
dataUri = null; | |||
} | |||
message = new Message(transportMessage.Headers, dataUri); | |||
} | |||
else | |||
{ | |||
var dataUri = "data:UnknownType;base64," + Convert.ToBase64String(transportMessage.Body); | |||
if (transportMessage.Body != null) | |||
{ | |||
dataUri = "data:UnknownType;base64," + Convert.ToBase64String(transportMessage.Body); | |||
} | |||
else | |||
{ | |||
dataUri = null; | |||
} | |||
message = new Message(transportMessage.Headers, dataUri); | |||
} | |||
} | |||
@@ -255,7 +268,7 @@ namespace DotNetCore.CAP.Internal | |||
TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | |||
_dispatcher.EnqueueToExecute(mediumMessage, executor); | |||
_dispatcher.EnqueueToExecute(mediumMessage, executor!); | |||
} | |||
} | |||
catch (Exception e) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Internal | |||
public ConsumerServiceSelector(IServiceProvider serviceProvider) | |||
{ | |||
_serviceProvider = serviceProvider; | |||
_capOptions = serviceProvider.GetService<IOptions<CapOptions>>().Value; | |||
_capOptions = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value; | |||
_cacheList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>(); | |||
} | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.Internal | |||
return executorDescriptorList; | |||
} | |||
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
public ConsumerExecutorDescriptor? SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
{ | |||
if (executeDescriptor.Count == 0) | |||
{ | |||
@@ -107,7 +107,7 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | |||
var types = Assembly.GetEntryAssembly().ExportedTypes; | |||
var types = Assembly.GetEntryAssembly()!.ExportedTypes; | |||
foreach (var type in types) | |||
{ | |||
var typeInfo = type.GetTypeInfo(); | |||
@@ -120,7 +120,7 @@ namespace DotNetCore.CAP.Internal | |||
return executorDescriptorList; | |||
} | |||
protected IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null) | |||
protected IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo? serviceTypeInfo = null) | |||
{ | |||
var topicClassAttribute = typeInfo.GetCustomAttribute<TopicAttribute>(true); | |||
@@ -169,9 +169,9 @@ namespace DotNetCore.CAP.Internal | |||
TopicAttribute attr, | |||
MethodInfo methodInfo, | |||
TypeInfo implType, | |||
TypeInfo serviceTypeInfo, | |||
TypeInfo? serviceTypeInfo, | |||
IList<ParameterDescriptor> parameters, | |||
TopicAttribute classAttr = null) | |||
TopicAttribute? classAttr = null) | |||
{ | |||
var descriptor = new ConsumerExecutorDescriptor | |||
{ | |||
@@ -187,7 +187,7 @@ namespace DotNetCore.CAP.Internal | |||
return descriptor; | |||
} | |||
private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
private ConsumerExecutorDescriptor? MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
{ | |||
if (key == null) | |||
{ | |||
@@ -197,7 +197,7 @@ namespace DotNetCore.CAP.Internal | |||
return executeDescriptor.FirstOrDefault(x => x.TopicName.Equals(key, StringComparison.InvariantCultureIgnoreCase)); | |||
} | |||
private ConsumerExecutorDescriptor MatchWildcardUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
private ConsumerExecutorDescriptor? MatchWildcardUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||
{ | |||
var group = executeDescriptor.First().Attribute.Group; | |||
if (!_cacheList.TryGetValue(group, out var tmpList)) | |||
@@ -223,9 +223,9 @@ namespace DotNetCore.CAP.Internal | |||
private class RegexExecuteDescriptor<T> | |||
{ | |||
public string Name { get; set; } | |||
public string Name { get; set; } = default!; | |||
public T Descriptor { get; set; } | |||
public T Descriptor { get; set; } = default!; | |||
} | |||
} | |||
} |
@@ -22,6 +22,6 @@ namespace DotNetCore.CAP.Internal | |||
/// </summary> | |||
/// <param name="key">topic or exchange router key.</param> | |||
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param> | |||
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates); | |||
ConsumerExecutorDescriptor? SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates); | |||
} | |||
} |
@@ -26,8 +26,7 @@ namespace DotNetCore.CAP.Internal | |||
private readonly IOptions<CapOptions> _options; | |||
// ReSharper disable once InconsistentNaming | |||
protected static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
protected static readonly DiagnosticListener s_diagnosticListener = new(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
public MessageSender( | |||
ILogger<MessageSender> logger, | |||
@@ -36,10 +35,10 @@ namespace DotNetCore.CAP.Internal | |||
_logger = logger; | |||
_serviceProvider = serviceProvider; | |||
_options = serviceProvider.GetService<IOptions<CapOptions>>(); | |||
_dataStorage = serviceProvider.GetService<IDataStorage>(); | |||
_serializer = serviceProvider.GetService<ISerializer>(); | |||
_transport = serviceProvider.GetService<ITransport>(); | |||
_options = serviceProvider.GetRequiredService<IOptions<CapOptions>>(); | |||
_dataStorage = serviceProvider.GetRequiredService<IDataStorage>(); | |||
_serializer = serviceProvider.GetRequiredService<ISerializer>(); | |||
_transport = serviceProvider.GetRequiredService<ITransport>(); | |||
} | |||
public async Task<OperateResult> SendAsync(MediumMessage message) | |||
@@ -80,9 +79,9 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
TracingError(tracingTimestamp, transportMsg, _transport.BrokerAddress, result); | |||
var needRetry = await SetFailedState(message, result.Exception); | |||
var needRetry = await SetFailedState(message, result.Exception!); | |||
return (needRetry, OperateResult.Failed(result.Exception)); | |||
return (needRetry, OperateResult.Failed(result.Exception!)); | |||
} | |||
} | |||
@@ -3,12 +3,11 @@ | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Persistence; | |||
using JetBrains.Annotations; | |||
namespace DotNetCore.CAP.Internal | |||
{ | |||
public interface IMessageSender | |||
{ | |||
Task<OperateResult> SendAsync([NotNull] MediumMessage message); | |||
Task<OperateResult> SendAsync(MediumMessage message); | |||
} | |||
} |
@@ -25,8 +25,7 @@ namespace DotNetCore.CAP.Internal | |||
// diagnostics listener | |||
// ReSharper disable once InconsistentNaming | |||
private static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
private static readonly DiagnosticListener s_diagnosticListener = new (CapDiagnosticListenerNames.DiagnosticListenerName); | |||
public SubscribeDispatcher( | |||
ILogger<SubscribeDispatcher> logger, | |||
@@ -37,8 +36,8 @@ namespace DotNetCore.CAP.Internal | |||
_logger = logger; | |||
_options = options.Value; | |||
_dataStorage = _provider.GetService<IDataStorage>(); | |||
Invoker = _provider.GetService<ISubscribeInvoker>(); | |||
_dataStorage = _provider.GetRequiredService<IDataStorage>(); | |||
Invoker = _provider.GetRequiredService<ISubscribeInvoker>(); | |||
} | |||
private ISubscribeInvoker Invoker { get; } | |||
@@ -46,7 +45,7 @@ namespace DotNetCore.CAP.Internal | |||
public Task<OperateResult> DispatchAsync(MediumMessage message, CancellationToken cancellationToken) | |||
{ | |||
var selector = _provider.GetRequiredService<MethodMatcherCache>(); | |||
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor)) | |||
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup()!, out var executor)) | |||
{ | |||
var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." + | |||
$"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63"; | |||
@@ -186,7 +185,7 @@ namespace DotNetCore.CAP.Internal | |||
if (!string.IsNullOrEmpty(ret.CallbackName)) | |||
{ | |||
var header = new Dictionary<string, string>() | |||
var header = new Dictionary<string, string?>() | |||
{ | |||
[Headers.CorrelationId] = message.Origin.GetId(), | |||
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString() | |||
@@ -249,7 +248,7 @@ namespace DotNetCore.CAP.Internal | |||
} | |||
} | |||
private void TracingError(long? tracingTimestamp, Message message, MethodInfo method, Exception ex) | |||
private void TracingError(long? tracingTimestamp, Message message, MethodInfo? method, Exception ex) | |||
{ | |||
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorSubscriberInvoke)) | |||
{ | |||