瀏覽代碼

Use dot prefix characters ‘xxx’ [xxx.*] as the stream name for NATS Jetstream. #1047

master
savorboard 3 年之前
父節點
當前提交
432cfd95ab
共有 2 個文件被更改,包括 12 次插入19 次删除
  1. +1
    -1
      src/DotNetCore.CAP.NATS/ITransport.NATS.cs
  2. +11
    -18
      src/DotNetCore.CAP.NATS/NATSConsumerClient.cs

+ 1
- 1
src/DotNetCore.CAP.NATS/ITransport.NATS.cs 查看文件

@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.NATS

var js = connection.CreateJetStreamContext(_jetStreamOptions);

var builder = PublishOptions.Builder().WithExpectedStream(Helper.Normalized(message.GetName())).WithMessageId(message.GetId());
var builder = PublishOptions.Builder().WithMessageId(message.GetId());

var resp = await js.PublishAsync(msg, builder.Build());



+ 11
- 18
src/DotNetCore.CAP.NATS/NATSConsumerClient.cs 查看文件

@@ -39,20 +39,21 @@ namespace DotNetCore.CAP.NATS
{
var jsm = _consumerClient.CreateJetStreamManagementContext();

foreach (var topic in topicNames)
var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

foreach (var subjectStream in streamGroup)
{
var norTopic = Helper.Normalized(topic);
try
{
jsm.GetStreamInfo(norTopic); // this throws if the stream does not exist
jsm.GetStreamInfo(subjectStream.Key); // this throws if the stream does not exist
}
catch (NATSJetStreamException)
{
var builder = StreamConfiguration.Builder()
.WithName(norTopic)
.WithName(subjectStream.Key)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(topic);
.WithSubjects(subjectStream.ToList());

_natsOptions.StreamOptions?.Invoke(builder);

@@ -81,28 +82,20 @@ namespace DotNetCore.CAP.NATS

var js = _consumerClient.CreateJetStreamContext();

foreach (var topic in topics)
foreach (var subject in topics)
{
var streamName = _natsOptions.NormalizeStreamName(subject);
var pso = PushSubscribeOptions.Builder()
.WithStream(Helper.Normalized(topic))
.WithStream(streamName)
.WithConfiguration(ConsumerConfiguration.Builder().WithDeliverPolicy(DeliverPolicy.New).Build())
.WithDeliverGroup(_groupId)
.Build();

js.PushSubscribeAsync(topic, Subscription_MessageHandler, false, pso);
js.PushSubscribeAsync(subject, Helper.Normalized(_groupId), Subscription_MessageHandler, false, pso);
}
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
//Connect();

//foreach (var subscription in _asyncSubscriptions)
//{
// subscription.MessageHandler += Subscription_MessageHandler;
// subscription.Start();
//}

while (true)
{
cancellationToken.ThrowIfCancellationRequested();
@@ -110,7 +103,7 @@ namespace DotNetCore.CAP.NATS
}
// ReSharper disable once FunctionNeverReturns
}
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
{
var headers = new Dictionary<string, string>();


Loading…
取消
儲存