Browse Source

Merge branch 'master' of https://github.com/dotnetcore/CAP

master
Savorboard 4 years ago
parent
commit
168dc34c28
19 changed files with 110 additions and 63 deletions
  1. +1
    -1
      .travis.yml
  2. +1
    -1
      README.md
  3. +1
    -1
      README.zh-cn.md
  4. +1
    -1
      appveyor.yml
  5. +7
    -9
      build/BuildScript.cs
  6. +1
    -1
      build/BuildScript.csproj
  7. +2
    -2
      docs/content/user-guide/zh/transports/azure-service-bus.md
  8. +1
    -1
      samples/Sample.Kafka.InMemory/Startup.cs
  9. +1
    -1
      samples/Sample.RabbitMQ.MySql/AppDbContext.cs
  10. +7
    -6
      samples/Sample.RabbitMQ.MySql/Startup.cs
  11. +5
    -3
      samples/Sample.RabbitMQ.SqlServer/Startup.cs
  12. +3
    -3
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  13. +9
    -5
      src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs
  14. +1
    -1
      src/DotNetCore.CAP/CAP.Options.cs
  15. +30
    -15
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  16. +17
    -10
      src/DotNetCore.CAP/Internal/IMessageSender.Default.cs
  17. +6
    -1
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  18. +3
    -1
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  19. +13
    -0
      src/DotNetCore.CAP/Messages/FailedInfo.cs

+ 1
- 1
.travis.yml View File

@@ -12,5 +12,5 @@ services:

script:
- export PATH="$PATH:$HOME/.dotnet/tools"
- dotnet tool install --global FlubuCore.GlobalTool --version 4.3.7
- dotnet tool install --global FlubuCore.GlobalTool --version 5.1.1
- flubu build tests

+ 1
- 1
README.md View File

@@ -227,7 +227,7 @@ services.AddCap(x =>
CAP v2.1+ provides the dashboard pages, you can easily view the sent and received messages. In addition, you can also view the message status in real time on the dashboard. Use the following command to install the Dashboard in your project.

```
PM> Install-Package DotNetCore.Dashboard
PM> Install-Package DotNetCore.CAP.Dashboard
```

In the distributed environment, the dashboard built-in integrated [Consul](http://consul.io) as a node discovery, while the realization of the gateway agent function, you can also easily view the node or other node data, It's like you are visiting local resources.


+ 1
- 1
README.zh-cn.md View File

@@ -242,7 +242,7 @@ CAP 2.1+ 以上版本中提供了仪表盘(Dashboard)功能,你可以很
使用一下命令安装 Dashboard:

```
PM> Install-Package DotNetCore.Dashboard
PM> Install-Package DotNetCore.CAP.Dashboard
```

在分布式环境中,仪表盘内置集成了 [Consul](http://consul.io) 作为节点的注册发现,同时实现了网关代理功能,你同样可以方便的查看本节点或者其他节点的数据,它就像你访问本地资源一样。


+ 1
- 1
appveyor.yml View File

@@ -7,7 +7,7 @@ environment:
services:
- mysql
before_build:
- ps: dotnet tool install --global FlubuCore.GlobalTool --version 4.3.7
- ps: dotnet tool install --global FlubuCore.GlobalTool --version 5.1.1
build_script:
- ps: flubu
test: off


+ 7
- 9
build/BuildScript.cs View File

@@ -1,5 +1,6 @@
using System.Collections.Generic;
using FlubuCore.Context;
using FlubuCore.Context.Attributes.BuildProperties;
using FlubuCore.IO;
using FlubuCore.Scripting;
using FlubuCore.Scripting.Attributes;
@@ -9,23 +10,20 @@ namespace BuildScript
[Include("./build/BuildVersion.cs")]
public partial class BuildScript : DefaultBuildScript
{
protected string ArtifactsDir => RootDirectory.CombineWith("artifacts");

[FromArg("c|configuration")]
public string Configuration { get; set; }
[BuildConfiguration]
public string Configuration { get; set; } = "Release";

[SolutionFileName] public string SolutionFileName { get; set; } = "CAP.sln";

protected BuildVersion BuildVersion { get; set; }

protected string ArtifactsDir => RootDirectory.CombineWith("artifacts");
protected List<FileFullPath> ProjectFiles { get; set; }

protected List<FileFullPath> TestProjectFiles { get; set; }

protected override void ConfigureBuildProperties(IBuildPropertiesContext context)
{
context.Properties.Set(BuildProps.SolutionFileName, "CAP.sln");
context.Properties.Set(BuildProps.BuildConfiguration, string.IsNullOrEmpty(Configuration) ? "Release" : Configuration);
}

protected override void BeforeBuildExecution(ITaskContext context)
{
BuildVersion = FetchBuildVersion(context);


+ 1
- 1
build/BuildScript.csproj View File

@@ -9,7 +9,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FlubuCore" Version="4.3.7" />
<PackageReference Include="FlubuCore" Version="5.1.1" />
</ItemGroup>

</Project>

+ 2
- 2
docs/content/user-guide/zh/transports/azure-service-bus.md View File

@@ -39,10 +39,10 @@ public void ConfigureServices(IServiceCollection services)

#### AzureServiceBus Options

CAP 直接对外提供的 Kafka 配置参数如下:
CAP 直接对外提供的 Azure Service Bus 配置参数如下:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
ConnectionString | Endpoint 地址 | string |
TopicPath | Topic entity path | string | cap
ManagementTokenProvider | Token提供 | ITokenProvider | null
ManagementTokenProvider | Token提供 | ITokenProvider | null

+ 1
- 1
samples/Sample.Kafka.InMemory/Startup.cs View File

@@ -10,7 +10,7 @@ namespace Sample.Kafka.InMemory
services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseKafka("192.168.2.120:9093");
x.UseKafka("localhost:9092");
x.UseDashboard();
});



+ 1
- 1
samples/Sample.RabbitMQ.MySql/AppDbContext.cs View File

@@ -26,7 +26,7 @@ namespace Sample.RabbitMQ.MySql
}
public class AppDbContext : DbContext
{
public const string ConnectionString = "Server=192.168.2.120;Database=captest;UserId=root;Password=123123;";
public const string ConnectionString = "Server=localhost;Database=testcap;UserId=root;Password=123123;";

public DbSet<Person> Persons { get; set; }



+ 7
- 6
samples/Sample.RabbitMQ.MySql/Startup.cs View File

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.RabbitMQ.MySql
{
@@ -14,13 +14,14 @@ namespace Sample.RabbitMQ.MySql
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("192.168.2.120");
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, msg) =>
x.FailedThresholdCallback = failed =>
{
Console.WriteLine(
$@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}");
var logger = failed.ServiceProvider.GetService<ILogger<Startup>>();
logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times,
requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
};
});



+ 5
- 3
samples/Sample.RabbitMQ.SqlServer/Startup.cs View File

@@ -2,6 +2,7 @@
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.RabbitMQ.SqlServer
{
@@ -17,10 +18,11 @@ namespace Sample.RabbitMQ.SqlServer
x.UseRabbitMQ("192.168.2.120");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, msg) =>
x.FailedThresholdCallback = failed =>
{
Console.WriteLine(
$@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}");
var logger = failed.ServiceProvider.GetService<ILogger<Startup>>();
logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times,
requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
};
});



+ 3
- 3
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClient : IConsumerClient
{
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
@@ -100,7 +100,7 @@ namespace DotNetCore.CAP.Kafka
return;
}

_connectionLock.Wait();
ConnectionLock.Wait();

try
{
@@ -117,7 +117,7 @@ namespace DotNetCore.CAP.Kafka
}
finally
{
_connectionLock.Release();
ConnectionLock.Release();
}
}



+ 9
- 5
src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs View File

@@ -32,9 +32,10 @@ namespace DotNetCore.CAP.RabbitMQ

public Task<OperateResult> SendAsync(TransportMessage message)
{
var channel = _connectionChannelPool.Rent();
IModel channel = null;
try
{
channel = _connectionChannelPool.Rent();
var props = new BasicProperties
{
DeliveryMode = 2,
@@ -62,12 +63,15 @@ namespace DotNetCore.CAP.RabbitMQ
}
finally
{
var returned = _connectionChannelPool.Return(channel);
if (!returned)
if (channel != null)
{
channel.Dispose();
var returned = _connectionChannelPool.Return(channel);
if (!returned)
{
channel.Dispose();
}
}
}
}
}
}
}

+ 1
- 1
src/DotNetCore.CAP/CAP.Options.cs View File

@@ -53,7 +53,7 @@ namespace DotNetCore.CAP
/// <summary>
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times.
/// </summary>
public Action<MessageType, Message> FailedThresholdCallback { get; set; }
public Action<FailedInfo> FailedThresholdCallback { get; set; }

/// <summary>
/// The number of message retries, the retry will stop when the threshold is reached.


+ 30
- 15
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -12,6 +12,7 @@ using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

@@ -19,14 +20,16 @@ namespace DotNetCore.CAP.Internal
{
internal class ConsumerRegister : IConsumerRegister
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;

private readonly IConsumerClientFactory _consumerClientFactory;
private readonly IDispatcher _dispatcher;
private readonly ISerializer _serializer;
private readonly IDataStorage _storage;
private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;

private CancellationTokenSource _cts;
private BrokerAddress _serverAddress;
@@ -39,21 +42,17 @@ namespace DotNetCore.CAP.Internal
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public ConsumerRegister(ILogger<ConsumerRegister> logger,
IOptions<CapOptions> options,
MethodMatcherCache selector,
IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher,
ISerializer serializer,
IDataStorage storage)
public ConsumerRegister(ILogger<ConsumerRegister> logger, IServiceProvider serviceProvider)
{
_options = options.Value;
_selector = selector;
_logger = logger;
_consumerClientFactory = consumerClientFactory;
_dispatcher = dispatcher;
_serializer = serializer;
_storage = storage;
_serviceProvider = serviceProvider;

_options = serviceProvider.GetService<IOptions<CapOptions>>().Value;
_selector = serviceProvider.GetService<MethodMatcherCache>();
_consumerClientFactory = serviceProvider.GetService<IConsumerClientFactory>();
_dispatcher = serviceProvider.GetService<IDispatcher>();
_serializer = serviceProvider.GetService<ISerializer>();
_storage = serviceProvider.GetService<IDataStorage>();
_cts = new CancellationTokenSource();
}

@@ -202,6 +201,22 @@ namespace DotNetCore.CAP.Internal

client.Commit(sender);

try
{
_options.FailedThresholdCallback?.Invoke(new FailedInfo
{
ServiceProvider = _serviceProvider,
MessageType = MessageType.Subscribe,
Message = message
});

_logger.ConsumerExecutedAfterThreshold(message.GetId(), _options.FailedRetryCount);
}
catch (Exception e)
{
_logger.ExecutedThresholdCallbackFailed(e);
}

TracingAfter(tracingTimestamp, transportMessage, _serverAddress);
}
else


+ 17
- 10
src/DotNetCore.CAP/Internal/IMessageSender.Default.cs View File

@@ -9,6 +9,7 @@ using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

@@ -16,10 +17,12 @@ namespace DotNetCore.CAP.Internal
{
internal class MessageSender : IMessageSender
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;

private readonly IDataStorage _dataStorage;
private readonly ISerializer _serializer;
private readonly ITransport _transport;
private readonly ILogger _logger;
private readonly IOptions<CapOptions> _options;

// ReSharper disable once InconsistentNaming
@@ -28,16 +31,15 @@ namespace DotNetCore.CAP.Internal

public MessageSender(
ILogger<MessageSender> logger,
IOptions<CapOptions> options,
IDataStorage dataStorage,
ISerializer serializer,
ITransport transport)
IServiceProvider serviceProvider)
{
_options = options;
_dataStorage = dataStorage;
_serializer = serializer;
_transport = transport;
_logger = logger;
_serviceProvider = serviceProvider;

_options = serviceProvider.GetService<IOptions<CapOptions>>();
_dataStorage = serviceProvider.GetService<IDataStorage>();
_serializer = serviceProvider.GetService<ISerializer>();
_transport = serviceProvider.GetService<ITransport>();
}

public async Task<OperateResult> SendAsync(MediumMessage message)
@@ -111,7 +113,12 @@ namespace DotNetCore.CAP.Internal
{
try
{
_options.Value.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Origin);
_options.Value.FailedThresholdCallback?.Invoke(new FailedInfo
{
ServiceProvider = _serviceProvider,
MessageType = MessageType.Publish,
Message = message.Origin
});

_logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount);
}


+ 6
- 1
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs View File

@@ -145,7 +145,12 @@ namespace DotNetCore.CAP.Internal
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Origin);
_options.FailedThresholdCallback?.Invoke(new FailedInfo
{
ServiceProvider = _provider,
MessageType = MessageType.Subscribe,
Message = message.Origin
});

_logger.ConsumerExecutedAfterThreshold(message.DbId, _options.FailedRetryCount);
}


+ 3
- 1
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs View File

@@ -3,6 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -60,7 +61,8 @@ namespace DotNetCore.CAP.Internal
}
else
{
executeParameters[i] = message.Value;
var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType);
executeParameters[i] = converter.ConvertFrom(message.Value);
}
}
}


+ 13
- 0
src/DotNetCore.CAP/Messages/FailedInfo.cs View File

@@ -0,0 +1,13 @@
using System;

namespace DotNetCore.CAP.Messages
{
public class FailedInfo
{
public IServiceProvider ServiceProvider { get; set; }

public MessageType MessageType { get; set; }

public Message Message { get; set; }
}
}

Loading…
Cancel
Save