diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 0faa14f..5ef9c30 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -37,11 +37,11 @@ namespace DotNetCore.CAP /// Initial list of brokers as a CSV list of broker host or host:port. /// /// - public string Servers { get; set; } + public string Servers { get; set; } = default!; /// /// If you need to get offset and partition and so on.., you can use this function to write additional header into /// - public Func, List>> CustomHeaders { get; set; } + public Func, List>>? CustomHeaders { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index ad5c623..5564064 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -2,14 +2,13 @@ netstandard2.1 - DotNetCore.CAP.Kafka + enable $(PackageTags);Kafka NU1605;NU1701 NU1701;CS1591 - bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Kafka.xml diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index 5484193..c4af49a 100644 --- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -43,8 +43,8 @@ namespace DotNetCore.CAP.Kafka var result = await producer.ProduceAsync(message.GetName(), new Message { Headers = headers, - Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), - Value = message.Body + Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string? kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), + Value = message.Body! }); if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 29680e4..98901bd 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka private readonly string _groupId; private readonly KafkaOptions _kafkaOptions; - private IConsumer _consumerClient; + private IConsumer? _consumerClient; public KafkaConsumerClient(string groupId, IOptions options) { @@ -29,11 +29,11 @@ namespace DotNetCore.CAP.Kafka _kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } - public event EventHandler OnMessageReceived; + public event EventHandler? OnMessageReceived; - public event EventHandler OnLog; + public event EventHandler? OnLog; - public BrokerAddress BrokerAddress => new BrokerAddress("Kafka", _kafkaOptions.Servers); + public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers); public ICollection FetchTopics(IEnumerable topicNames) { @@ -80,7 +80,7 @@ namespace DotNetCore.CAP.Kafka Connect(); - _consumerClient.Subscribe(topics); + _consumerClient!.Subscribe(topics); } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) @@ -89,11 +89,11 @@ namespace DotNetCore.CAP.Kafka while (true) { - var consumerResult = _consumerClient.Consume(cancellationToken); + var consumerResult = _consumerClient!.Consume(cancellationToken); if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; - var headers = new Dictionary(consumerResult.Message.Headers.Count); + var headers = new Dictionary(consumerResult.Message.Headers.Count); foreach (var header in consumerResult.Message.Headers) { var val = header.GetValueBytes(); @@ -119,12 +119,12 @@ namespace DotNetCore.CAP.Kafka public void Commit(object sender) { - _consumerClient.Commit((ConsumeResult)sender); + _consumerClient!.Commit((ConsumeResult)sender); } - public void Reject(object sender) + public void Reject(object? sender) { - _consumerClient.Assign(_consumerClient.Assignment); + _consumerClient!.Assign(_consumerClient.Assignment); } public void Dispose()