Ver código fonte

Merge pull request #15 from dotnetcore/develop

Release 1.0.1
master
Savorboard 7 anos atrás
committed by GitHub
pai
commit
3b5333a7f0
21 arquivos alterados com 434 adições e 105 exclusões
  1. +1
    -1
      CODE_OF_CONDUCT.md
  2. +45
    -20
      README.md
  3. +10
    -13
      README.zh-cn.md
  4. +1
    -1
      build/version.cake
  5. +1
    -1
      build/version.props
  6. +0
    -1
      samples/Sample.Kafka/Controllers/ValuesController.cs
  7. +1
    -1
      samples/Sample.Kafka/Sample.Kafka.csproj
  8. +1
    -1
      samples/Sample.Kafka/Startup.cs
  9. +2
    -2
      src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs
  10. +21
    -21
      src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs
  11. +143
    -25
      src/DotNetCore.CAP.SqlServer/CapPublisher.cs
  12. +20
    -0
      src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
  13. +8
    -3
      src/DotNetCore.CAP/CAP.Options.cs
  14. +1
    -0
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  15. +51
    -9
      src/DotNetCore.CAP/ICapPublisher.cs
  16. +10
    -0
      src/DotNetCore.CAP/IStorageConnection.cs
  17. +1
    -0
      src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
  18. +86
    -0
      src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs
  19. +5
    -5
      src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs
  20. +1
    -1
      src/DotNetCore.CAP/Processor/RetryBehavior.cs
  21. +25
    -0
      test/DotNetCore.CAP.Test/CAP.BuilderTest.cs

+ 1
- 1
CODE_OF_CONDUCT.md Ver arquivo

@@ -34,7 +34,7 @@ This Code of Conduct applies both within project spaces and in public spaces whe

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at m.r992@hotmail.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at yangxiaodong1214@126.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.

Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership.



+ 45
- 20
README.md Ver arquivo

@@ -1,12 +1,9 @@
<p align="right">
  <a href="https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md">中文</a>
</p>

# CAP

[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/develop/README.zh-cn.md)
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)

CAP is a .Net Standard library to achieve eventually consistent in distributed architectures system like SOA,MicroService. It is lightweight,easy to use and efficiently.
@@ -17,7 +14,7 @@ CAP is a library that used in an ASP.NET Core project, Of Course you can ues it

You can think of CAP as an EventBus because it has all the features of EventBus, and CAP provides a easier way to handle the publishing and subscribing than EventBus.

CAP has the function of Message Presistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions.
CAP has the function of Message Persistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions.

This is a diagram of the CAP working in the ASP.NET Core MicroService architecture:

@@ -27,26 +24,30 @@ This is a diagram of the CAP working in the ASP.NET Core MicroService architectu

## Getting Started

### NuGet (Coming soon)
### NuGet

You can run the following command to install the CAP in your project.

```
PM> Install-Package DotNetCore.CAP
```

If your Message Queue is using Kafka, you can:

```
PM> Install-Package DotNetCore.CAP.Kafka -Pre
PM> Install-Package DotNetCore.CAP.Kafka
```

or RabbitMQ:
If your Message Queue is using RabbitMQ, you can

```
PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre
PM> Install-Package DotNetCore.CAP.RabbitMQ
```

CAP provides EntityFramework as default database store extension :
CAP provides EntityFramework as default database store extension (The MySQL version is under development)

```
PM> Install-Package DotNetCore.CAP.EntityFrameworkCore -Pre
PM> Install-Package DotNetCore.CAP.SqlServer
```

### Configuration
@@ -58,11 +59,23 @@ public void ConfigureServices(IServiceCollection services)
{
......

services.AddDbContext<AppDbContext>();
services.AddDbContext<AppDbContext>();

services.AddCap()
.AddEntityFrameworkStores<AppDbContext>()
.AddKafka(x => x.Servers = "localhost:9092");
services.AddCap(x =>
{
// If your SqlServer is using EF for data operations, you need to add the following configuration:
// Notice: You don't need to config x.UseSqlServer(""") again!
x.UseEntityFramework<AppDbContext>();
// If you are using Dapper,you need to add the config:
x.UseSqlServer("Your ConnectionStrings");

// If your Message Queue is using RabbitMQ you need to add the config:
x.UseRabbitMQ("localhost");

// If your Message Queue is using Kafka you need to add the config:
x.UseKafka("localhost");
});
}

public void Configure(IApplicationBuilder app)
@@ -92,11 +105,23 @@ public class PublishController : Controller
[Route("~/checkAccount")]
public async Task<IActionResult> PublishMessage()
{
//Specifies the message header and content to be sent
// Specifies the message header and content to be sent
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });

return Ok();
}

[Route("~/checkAccountWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });

trans.Commit();
}
return Ok();
}
}

```


+ 10
- 13
README.zh-cn.md Ver arquivo

@@ -1,12 +1,9 @@
<p align="right">
<a href="https://github.com/dotnetcore/CAP/blob/master/README.md">English</a>
</p>

# CAP                       
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
# CAP                       [English](https://github.com/dotnetcore/CAP/blob/develop/README.md)
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)

CAP 是一个在分布式系统(SOA、MicroService)中实现最终一致性的库,它具有轻量级、易使用、高性能等特点。
@@ -32,25 +29,25 @@ CAP 具有消息持久化的功能,当你的服务进行重启或者宕机时
你可以运行以下下命令在你的项目中安装 CAP。

```
PM> Install-Package DotNetCore.CAP -Pre
PM> Install-Package DotNetCore.CAP
```

如果你的消息队列使用的是 Kafka 的话,你可以:

```
PM> Install-Package DotNetCore.CAP.Kafka -Pre
PM> Install-Package DotNetCore.CAP.Kafka
```

如果你的消息队列使用的是 RabbitMQ 的话,你可以:

```
PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre
PM> Install-Package DotNetCore.CAP.RabbitMQ
```

CAP 默认提供了 Sql Server 的扩展作为数据库存储(MySql的正在开发中):

```
PM> Install-Package DotNetCore.CAP.SqlServer -Pre
PM> Install-Package DotNetCore.CAP.SqlServer
```

### Configuration
@@ -174,7 +171,7 @@ namespace xxx.Service

public class SubscriberService: ISubscriberService, ICapSubscribe
{
[KafkaTopic("xxx.services.account.check")]
[CapSubscribe("xxx.services.account.check")]
public void CheckReceivedMessage(Person person)
{


+ 1
- 1
build/version.cake Ver arquivo

@@ -77,7 +77,7 @@ public class BuildParameters
var suffix = versionQuality;
if (!IsTagged)
{
suffix += (IsCI ? "ci-" : "dv-") + Util.CreateStamp();
suffix += (IsCI ? "preview-" : "dv-") + Util.CreateStamp();
}
suffix = string.IsNullOrWhiteSpace(suffix) ? null : suffix;



+ 1
- 1
build/version.props Ver arquivo

@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionPatch>1</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 0
- 1
samples/Sample.Kafka/Controllers/ValuesController.cs Ver arquivo

@@ -1,7 +1,6 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc;



+ 1
- 1
samples/Sample.Kafka/Sample.Kafka.csproj Ver arquivo

@@ -24,7 +24,7 @@
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>


+ 1
- 1
samples/Sample.Kafka/Startup.cs Ver arquivo

@@ -29,7 +29,7 @@ namespace Sample.Kafka
{
x.UseEntityFramework<AppDbContext>();
//x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
x.UseRabbitMQ(o => { o.HostName = "192.168.2.206"; o.UserName = "admin"; o.Password = "123123"; });
x.UseKafka("localhost:9092");
});

// Add framework services.


+ 2
- 2
src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs Ver arquivo

@@ -8,13 +8,13 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static CapOptions UseKafka(this CapOptions options, string bootstrapServers)
{
return options.UseRabbitMQ(opt =>
return options.UseKafka(opt =>
{
opt.Servers = bootstrapServers;
});
}

public static CapOptions UseRabbitMQ(this CapOptions options, Action<KafkaOptions> configure)
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));



+ 21
- 21
src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs Ver arquivo

@@ -6,35 +6,35 @@ namespace DotNetCore.CAP
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string name)
: this(name, 0)
: base(name)
{
}

/// <summary>
/// Not support
/// </summary>
public CapSubscribeAttribute(string name, int partition)
: this(name, partition, 0)
{
}
///// <summary>
///// Not support
///// </summary>
//public CapSubscribeAttribute(string name, int partition)
// : this(name, partition, 0)
//{
//}

/// <summary>
/// Not support
/// </summary>
public CapSubscribeAttribute(string name, int partition, long offset)
: base(name)
{
Offset = offset;
Partition = partition;
}
///// <summary>
///// Not support
///// </summary>
//public CapSubscribeAttribute(string name, int partition, long offset)
// : base(name)
//{
// Offset = offset;
// Partition = partition;
//}

public int Partition { get; }
//public int Partition { get; }

public long Offset { get; }
//public long Offset { get; }

public bool IsPartition => Partition == 0;
//public bool IsPartition => Partition == 0;

public bool IsOffset => Offset == 0;
//public bool IsOffset => Offset == 0;

public override string ToString()
{


+ 143
- 25
src/DotNetCore.CAP.SqlServer/CapPublisher.cs Ver arquivo

@@ -7,20 +7,28 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : ICapPublisher
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;

protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; }

protected IServiceProvider ServiceProvider { get; }

public CapPublisher(IServiceProvider provider, SqlServerOptions options)
public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
{
ServiceProvider = provider;
_logger = logger;
_options = options;

if (_options.DbContextType != null)
@@ -30,55 +38,130 @@ namespace DotNetCore.CAP.SqlServer
}
}

public void Publish(string name, string content)
{
CheckIsUsingEF(name);

PublishCore(name, content);
}

public Task PublishAsync(string name, string content)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
CheckIsUsingEF(name);

return PublishCoreAsync(name, content);
}

return Publish(name, content);
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Helper.ToJson(contentObj);

PublishCore(name, content);
}

public Task PublishAsync<T>(string name, T contentObj)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
CheckIsUsingEF(name);

var content = Helper.ToJson(contentObj);
return Publish(name, content);

return PublishCoreAsync(name, content);
}

public Task PublishAsync(string name, string content, IDbConnection dbConnection)
public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;

var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
return PublishWithTrans(name, content, dbConnection, dbTransaction);
PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

var content = Helper.ToJson(contentObj);

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

var content = Helper.ToJson(contentObj);

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

#region private methods

private void CheckIsUsingEF(string name)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

return PublishWithTrans(name, content, dbConnection, dbTransaction);
private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task Publish(string name, string content)
private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTrans(name, content, connection, dbTransaction);
await PublishWithTransAsync(name, content, connection, dbTransaction);
}

private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
}

private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
@@ -86,11 +169,46 @@ namespace DotNetCore.CAP.SqlServer
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);

var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction);
_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}

PublishQueuer.PulseEvent.Set();
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}

private string PrepareSql()
{
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}

#endregion private methods
}
}

+ 20
- 0
src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs Ver arquivo

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
@@ -55,6 +56,16 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
}
}

public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";

using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}

// CapReceviedMessage

public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
@@ -89,6 +100,15 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}

public void Dispose()
{
}


+ 8
- 3
src/DotNetCore.CAP/CAP.Options.cs Ver arquivo

@@ -22,14 +22,19 @@ namespace DotNetCore.CAP
}

/// <summary>
/// Productor job polling delay time. Default is 8 sec.
/// Productor job polling delay time. Default is 5 sec.
/// </summary>
public int PollingDelay { get; set; } = 8;
public int PollingDelay { get; set; } = 5;

/// <summary>
/// Failed messages polling delay time. Default is 2 min.
/// </summary>
public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2);

/// <summary>
/// We’ll send a POST request to the URL below with details of any subscribed events.
/// </summary>
public WebHook WebHook { get; set; }
public WebHook WebHook => throw new NotSupportedException();

/// <summary>
/// Registers an extension that will be executed when building services.


+ 1
- 0
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs Ver arquivo

@@ -47,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection
//Processors
services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>();
services.AddTransient<FailedJobProcessor>();
services.AddTransient<IDispatcher, DefaultDispatcher>();

//Executors


+ 51
- 9
src/DotNetCore.CAP/ICapPublisher.cs Ver arquivo

@@ -9,7 +9,7 @@ namespace DotNetCore.CAP
public interface ICapPublisher
{
/// <summary>
/// Publish a string message to specified topic.
/// (EntityFramework) Asynchronous publish a message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
@@ -20,7 +20,18 @@ namespace DotNetCore.CAP
Task PublishAsync(string name, string content);

/// <summary>
/// Publis a object message to specified topic.
/// (EntityFramework) Publish a message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
void Publish(string name, string content);

/// <summary>
/// (EntityFramework) Asynchronous publish a object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
@@ -28,24 +39,55 @@ namespace DotNetCore.CAP
/// </summary>
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">object instance that will be serialized of json.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
Task PublishAsync<T>(string name, T contentObj);

/// <summary>
/// Publish a string message to specified topic with transacton.
/// (EntityFramework) Publish a object message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the dbConnection of <see cref="IDbConnection"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection);
/// <param name="contentObj">message body content, that will be serialized of json.</param>
void Publish<T>(string name, T contentObj);

/// <summary>
/// Publish a string message to specified topic with transacton.
/// (ado.net) Asynchronous publish a message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

/// <summary>
/// (ado.net) Publish a message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction);
void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

/// <summary>
/// (ado.net) Asynchronous publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

/// <summary>
/// (ado.net) Publish a object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
}
}

+ 10
- 0
src/DotNetCore.CAP/IStorageConnection.cs Ver arquivo

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;

@@ -27,6 +28,11 @@ namespace DotNetCore.CAP
/// </summary>
Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync();

/// <summary>
/// Returns executed failed messages.
/// </summary>
Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages();

// Received messages

/// <summary>
@@ -46,6 +52,10 @@ namespace DotNetCore.CAP
/// </summary>
Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync();

/// <summary>
/// Returns executed failed message.
/// </summary>
Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages();
//-----------------------------------------

/// <summary>


+ 1
- 0
src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs Ver arquivo

@@ -117,6 +117,7 @@ namespace DotNetCore.CAP.Processor

returnedProcessors.Add(_provider.GetRequiredService<PublishQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<SubscribeQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<FailedJobProcessor>());

returnedProcessors.Add(_provider.GetRequiredService<IAdditionalProcessor>());



+ 86
- 0
src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs Ver arquivo

@@ -0,0 +1,86 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
{
public class FailedJobProcessor : IProcessor
{
private readonly CapOptions _options;
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
private readonly IStateChanger _stateChanger;

private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval;

public FailedJobProcessor(
IOptions<CapOptions> options,
ILogger<FailedJobProcessor> logger,
IServiceProvider provider,
IStateChanger stateChanger)
{
_options = options.Value;
_logger = logger;
_provider = provider;
_stateChanger = stateChanger;
_waitingInterval = _options.FailedMessageWaitingInterval;
}

public async Task ProcessAsync(ProcessingContext context)
{
if (context == null)
throw new ArgumentNullException(nameof(context));

using (var scope = _provider.CreateScope())
{
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();

await Task.WhenAll(
ProcessPublishedAsync(connection, context),
ProcessReceivededAsync(connection, context));

DefaultDispatcher.PulseEvent.Set();

await context.WaitAsync(_waitingInterval);
}
}

private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedPublishedMessages();
foreach (var message in messages)
{
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}

private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedReceviedMessages();
foreach (var message in messages)
{
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}
}
}

+ 5
- 5
src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs Ver arquivo

@@ -12,11 +12,11 @@ namespace DotNetCore.CAP.Processor
{
public class PublishQueuer : IProcessor
{
private ILogger _logger;
private CapOptions _options;
private IStateChanger _stateChanger;
private IServiceProvider _provider;
private TimeSpan _pollingDelay;
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
private readonly IServiceProvider _provider;
private readonly TimeSpan _pollingDelay;

public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);



+ 1
- 1
src/DotNetCore.CAP/Processor/RetryBehavior.cs Ver arquivo

@@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Processor

static RetryBehavior()
{
DefaultRetryCount = 25;
DefaultRetryCount = 15;
DefaultRetryInThunk = retries =>
(int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));



+ 25
- 0
test/DotNetCore.CAP.Test/CAP.BuilderTest.cs Ver arquivo

@@ -61,6 +61,26 @@ namespace DotNetCore.CAP.Test

private class MyProducerService : ICapPublisher
{
public void Publish(string name, string content)
{
throw new NotImplementedException();
}

public void Publish<T>(string name, T contentObj)
{
throw new NotImplementedException();
}

public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}

public Task PublishAsync(string topic, string content)
{
throw new NotImplementedException();
@@ -80,6 +100,11 @@ namespace DotNetCore.CAP.Test
{
throw new NotImplementedException();
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
throw new NotImplementedException();
}
}
}
}

Carregando…
Cancelar
Salvar