Browse Source

update

master
yangxiaodong 7 years ago
parent
commit
bc36b3d4d5
5 changed files with 29 additions and 23 deletions
  1. +1
    -3
      src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs
  2. +2
    -0
      src/Cap.Consistency/Abstractions/TopicAttribute.cs
  3. +3
    -8
      src/Cap.Consistency/Builder/ConsistencyBuilder.cs
  4. +11
    -12
      src/Cap.Consistency/Consumer/ConsumerHandler.cs
  5. +12
    -0
      src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs

+ 1
- 3
src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs View File

@@ -11,8 +11,6 @@ namespace Cap.Consistency.Abstractions

public TypeInfo ImplTypeInfo { get; set; }

public TopicInfo Topic { get; set; }

public string GroupId { get; set; }
public TopicAttribute Attribute { get; set; }
}
}

+ 2
- 0
src/Cap.Consistency/Abstractions/TopicAttribute.cs View File

@@ -18,6 +18,8 @@ namespace Cap.Consistency.Abstractions
get { return _name; }
}

public string GroupOrExchange { get; set; }

public bool IsOneWay { get; set; }
}
}

+ 3
- 8
src/Cap.Consistency/Builder/ConsistencyBuilder.cs View File

@@ -27,8 +27,6 @@ namespace Cap.Consistency
Services = service;

AddConsumerServices();

AddKafkaServices();
}

/// <summary>
@@ -70,13 +68,10 @@ namespace Cap.Consistency
Services.AddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
Services.AddSingleton<MethodMatcherCache>();

return this;
}

public virtual ConsistencyBuilder AddKafkaServices() {
Services.AddSingleton(typeof(ITopicRoute), typeof(ConsumerHandler<>).MakeGenericType(MessageType));

return AddScoped(typeof(ITopicRoute), typeof(ConsumerHandler<>).MakeGenericType(MessageType));
}
return this;
}


/// <summary>


+ 11
- 12
src/Cap.Consistency/Consumer/ConsumerHandler.cs View File

@@ -5,10 +5,9 @@ using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Routing;
using Microsoft.AspNetCore.Builder;
using Cap.Consistency.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Cap.Consistency.Internal;

namespace Cap.Consistency.Consumer
{
@@ -59,18 +58,20 @@ namespace Cap.Consistency.Consumer

var matchs = _selector.GetCandidatesMethods(context);

var groupingMatchs = matchs.GroupBy(x => x.Value.GroupId);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);

foreach (var matchGroup in groupingMatchs) {
using (var client = _consumerClientFactory.Create(matchGroup.Key, _options.BrokerUrlList)) {
client.MessageReceieved += OnMessageReceieved;
Task.Factory.StartNew(() => {
using (var client = _consumerClientFactory.Create(matchGroup.Key, _options.BrokerUrlList)) {
client.MessageReceieved += OnMessageReceieved;

foreach (var item in matchGroup) {
client.Subscribe(item.Key, item.Value.Topic.Partition);
}
foreach (var item in matchGroup) {
client.Subscribe(item.Key);
}

client.Listening(TimeSpan.Zero);
}
client.Listening(TimeSpan.FromSeconds(1));
}
}, TaskCreationOptions.LongRunning);
}
return Task.CompletedTask;
}
@@ -100,8 +101,6 @@ namespace Cap.Consistency.Consumer
catch (Exception ex) {

_logger.LogError("exception raised when excute method : " + ex.Message);

throw ex;
}
}
}


+ 12
- 0
src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs View File

@@ -0,0 +1,12 @@
using Cap.Consistency;

namespace Cap.Consistency.Infrastructure
{
/// <summary>
/// Represents all the options you can use to configure the system.
/// </summary>
public class ConsistencyOptions
{
public string BrokerUrlList { get; set; } = "localhost:9092";
}
}

Loading…
Cancel
Save