Browse Source

Enable #nullable for kafka transport.

master
Savorboard 3 years ago
parent
commit
17c81c891a
4 changed files with 15 additions and 16 deletions
  1. +2
    -2
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +1
    -2
      src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
  3. +2
    -2
      src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
  4. +10
    -10
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+ 2
- 2
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs View File

@@ -37,11 +37,11 @@ namespace DotNetCore.CAP
/// Initial list of brokers as a CSV list of broker host or host:port. /// Initial list of brokers as a CSV list of broker host or host:port.
/// </para> /// </para>
/// </summary> /// </summary>
public string Servers { get; set; }
public string Servers { get; set; } = default!;


/// <summary> /// <summary>
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/> /// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/>
/// </summary> /// </summary>
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>> CustomHeaders { get; set; }
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }
} }
} }

+ 1
- 2
src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj View File

@@ -2,14 +2,13 @@


<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework> <TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName>
<Nullable>enable</Nullable>
<PackageTags>$(PackageTags);Kafka</PackageTags> <PackageTags>$(PackageTags);Kafka</PackageTags>
</PropertyGroup> </PropertyGroup>


<PropertyGroup> <PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors> <WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn> <NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Kafka.xml</DocumentationFile>
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>


+ 2
- 2
src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs View File

@@ -43,8 +43,8 @@ namespace DotNetCore.CAP.Kafka
var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]> var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]>
{ {
Headers = headers, Headers = headers,
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(),
Value = message.Body
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string? kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(),
Value = message.Body!
}); });


if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted)


+ 10
- 10
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka


private readonly string _groupId; private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions; private readonly KafkaOptions _kafkaOptions;
private IConsumer<string, byte[]> _consumerClient;
private IConsumer<string, byte[]>? _consumerClient;


public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options) public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options)
{ {
@@ -29,11 +29,11 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); _kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
} }


public event EventHandler<TransportMessage> OnMessageReceived;
public event EventHandler<TransportMessage>? OnMessageReceived;


public event EventHandler<LogMessageEventArgs> OnLog;
public event EventHandler<LogMessageEventArgs>? OnLog;


public BrokerAddress BrokerAddress => new BrokerAddress("Kafka", _kafkaOptions.Servers);
public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers);


public ICollection<string> FetchTopics(IEnumerable<string> topicNames) public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{ {
@@ -80,7 +80,7 @@ namespace DotNetCore.CAP.Kafka


Connect(); Connect();


_consumerClient.Subscribe(topics);
_consumerClient!.Subscribe(topics);
} }


public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
@@ -89,11 +89,11 @@ namespace DotNetCore.CAP.Kafka


while (true) while (true)
{ {
var consumerResult = _consumerClient.Consume(cancellationToken);
var consumerResult = _consumerClient!.Consume(cancellationToken);


if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;


var headers = new Dictionary<string, string>(consumerResult.Message.Headers.Count);
var headers = new Dictionary<string, string?>(consumerResult.Message.Headers.Count);
foreach (var header in consumerResult.Message.Headers) foreach (var header in consumerResult.Message.Headers)
{ {
var val = header.GetValueBytes(); var val = header.GetValueBytes();
@@ -119,12 +119,12 @@ namespace DotNetCore.CAP.Kafka


public void Commit(object sender) public void Commit(object sender)
{ {
_consumerClient.Commit((ConsumeResult<string, byte[]>)sender);
_consumerClient!.Commit((ConsumeResult<string, byte[]>)sender);
} }


public void Reject(object sender)
public void Reject(object? sender)
{ {
_consumerClient.Assign(_consumerClient.Assignment);
_consumerClient!.Assign(_consumerClient.Assignment);
} }


public void Dispose() public void Dispose()


Loading…
Cancel
Save