@@ -1,4 +1,5 @@ | |||||
using Microsoft.AspNetCore.Builder; | |||||
using DotNetCore.CAP.EntityFrameworkCore; | |||||
using Microsoft.AspNetCore.Builder; | |||||
using Microsoft.AspNetCore.Hosting; | using Microsoft.AspNetCore.Hosting; | ||||
using Microsoft.Extensions.Configuration; | using Microsoft.Extensions.Configuration; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
@@ -23,10 +24,10 @@ namespace Sample.Kafka | |||||
// This method gets called by the runtime. Use this method to add services to the container. | // This method gets called by the runtime. Use this method to add services to the container. | ||||
public void ConfigureServices(IServiceCollection services) | public void ConfigureServices(IServiceCollection services) | ||||
{ | { | ||||
services.AddDbContext<AppDbContext>(); | |||||
services.AddDbContext<CapDbContext>(); | |||||
services.AddCap() | services.AddCap() | ||||
.AddEntityFrameworkStores<AppDbContext>() | |||||
.AddEntityFrameworkStores<CapDbContext>() | |||||
.AddRabbitMQ(x => | .AddRabbitMQ(x => | ||||
{ | { | ||||
x.HostName = "localhost"; | x.HostName = "localhost"; | ||||
@@ -9,6 +9,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore | |||||
public const string DefaultSchema = "cap"; | public const string DefaultSchema = "cap"; | ||||
public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory"; | public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory"; | ||||
public EFOptions() | |||||
{ | |||||
ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// Gets or sets the database's connection string that will be used to store database entities. | /// Gets or sets the database's connection string that will be used to store database entities. | ||||
/// </summary> | /// </summary> | ||||
@@ -15,7 +15,9 @@ namespace DotNetCore.CAP.EntityFrameworkCore | |||||
/// <summary> | /// <summary> | ||||
/// Initializes a new instance of the <see cref="CapDbContext"/>. | /// Initializes a new instance of the <see cref="CapDbContext"/>. | ||||
/// </summary> | /// </summary> | ||||
public CapDbContext() { } | |||||
public CapDbContext() { | |||||
_efOptions = new EFOptions(); | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// Initializes a new instance of the <see cref="CapDbContext"/>. | /// Initializes a new instance of the <see cref="CapDbContext"/>. | ||||
@@ -65,5 +67,10 @@ namespace DotNetCore.CAP.EntityFrameworkCore | |||||
b.Property(p => p.StatusName).IsRequired().HasMaxLength(50); | b.Property(p => p.StatusName).IsRequired().HasMaxLength(50); | ||||
}); | }); | ||||
} | } | ||||
//protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) | |||||
//{ | |||||
// optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); | |||||
//} | |||||
} | } | ||||
} | } |
@@ -0,0 +1,51 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Data; | |||||
using System.Threading.Tasks; | |||||
using DotNetCore.CAP.Models; | |||||
using Dapper; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
namespace DotNetCore.CAP | |||||
{ | |||||
static class CapPublisherExtensions | |||||
{ | |||||
public static async Task Publish(this ICapPublisher publisher, string topic, string content, DatabaseFacade database) | |||||
{ | |||||
var connection = database.GetDbConnection(); | |||||
var transaction = database.CurrentTransaction; | |||||
transaction = transaction ?? await database.BeginTransactionAsync(IsolationLevel.ReadCommitted); | |||||
var message = new CapSentMessage | |||||
{ | |||||
KeyName = topic, | |||||
Content = content, | |||||
StatusName = StatusName.Enqueued | |||||
}; | |||||
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; | |||||
await connection.ExecuteAsync(sql, transaction); | |||||
JobQueuer.PulseEvent.Set(); | |||||
} | |||||
public static async Task<int> Publish(this ICapPublisher publisher, string topic, string content, IDbConnection connection,IDbTransaction transaction) | |||||
{ | |||||
var message = new CapSentMessage | |||||
{ | |||||
KeyName = topic, | |||||
Content = content, | |||||
StatusName = StatusName.Enqueued | |||||
}; | |||||
var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; | |||||
return await connection.ExecuteAsync(sql, transaction); | |||||
JobQueuer.PulseEvent.Set(); | |||||
} | |||||
} | |||||
} |
@@ -14,6 +14,7 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.2" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" /> | <PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" /> | ||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" /> | <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" /> | ||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.2" /> | <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="1.1.2" /> | ||||
@@ -96,7 +96,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore | |||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
public Task<CapSentMessage> GetNextReceviedMessageToBeEnqueuedAsync() | |||||
public Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync() | |||||
{ | { | ||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
@@ -113,6 +113,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
} | |||||
} | |||||
} | } | ||||
} | } |
@@ -1,30 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Data; | |||||
namespace DotNetCore.CAP.EntityFrameworkCore | |||||
{ | |||||
static class HelperExtensions | |||||
{ | |||||
public static void Execute(this IDbConnection connection, string sql, IDbTransaction transcation = null) | |||||
{ | |||||
try | |||||
{ | |||||
connection.Open(); | |||||
using (var command = connection.CreateCommand()) | |||||
{ | |||||
command.CommandText = "SELELCT 1"; | |||||
if (transcation != null) | |||||
command.Transaction = transcation; | |||||
command.ExecuteNonQuery(); | |||||
} | |||||
} | |||||
finally | |||||
{ | |||||
connection.Close(); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,90 @@ | |||||
using System; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||||
using Microsoft.EntityFrameworkCore.Metadata; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
using DotNetCore.CAP.EntityFrameworkCore; | |||||
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations | |||||
{ | |||||
[DbContext(typeof(CapDbContext))] | |||||
[Migration("20170711154104_InitializeDB")] | |||||
partial class InitializeDB | |||||
{ | |||||
protected override void BuildTargetModel(ModelBuilder modelBuilder) | |||||
{ | |||||
modelBuilder | |||||
.HasDefaultSchema("cap") | |||||
.HasAnnotation("ProductVersion", "1.1.2") | |||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b => | |||||
{ | |||||
b.Property<int>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<string>("MessageId"); | |||||
b.Property<int>("Type"); | |||||
b.HasKey("Id"); | |||||
b.ToTable("CapQueue"); | |||||
}); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b => | |||||
{ | |||||
b.Property<string>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<DateTime>("Added"); | |||||
b.Property<string>("Content"); | |||||
b.Property<string>("Group"); | |||||
b.Property<string>("KeyName"); | |||||
b.Property<DateTime?>("LastRun"); | |||||
b.Property<int>("Retries"); | |||||
b.Property<string>("StatusName") | |||||
.IsRequired() | |||||
.HasMaxLength(50); | |||||
b.HasKey("Id"); | |||||
b.HasIndex("StatusName"); | |||||
b.ToTable("CapReceivedMessages"); | |||||
}); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b => | |||||
{ | |||||
b.Property<string>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<DateTime>("Added"); | |||||
b.Property<string>("Content"); | |||||
b.Property<string>("KeyName"); | |||||
b.Property<DateTime?>("LastRun"); | |||||
b.Property<int>("Retries"); | |||||
b.Property<string>("StatusName") | |||||
.IsRequired() | |||||
.HasMaxLength(50); | |||||
b.HasKey("Id"); | |||||
b.HasIndex("StatusName"); | |||||
b.ToTable("CapSentMessages"); | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,95 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
using Microsoft.EntityFrameworkCore.Metadata; | |||||
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations | |||||
{ | |||||
public partial class InitializeDB : Migration | |||||
{ | |||||
protected override void Up(MigrationBuilder migrationBuilder) | |||||
{ | |||||
migrationBuilder.EnsureSchema( | |||||
name: "cap"); | |||||
migrationBuilder.CreateTable( | |||||
name: "CapQueue", | |||||
schema: "cap", | |||||
columns: table => new | |||||
{ | |||||
Id = table.Column<int>(nullable: false) | |||||
.Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn), | |||||
MessageId = table.Column<string>(nullable: true), | |||||
Type = table.Column<int>(nullable: false) | |||||
}, | |||||
constraints: table => | |||||
{ | |||||
table.PrimaryKey("PK_CapQueue", x => x.Id); | |||||
}); | |||||
migrationBuilder.CreateTable( | |||||
name: "CapReceivedMessages", | |||||
schema: "cap", | |||||
columns: table => new | |||||
{ | |||||
Id = table.Column<string>(nullable: false), | |||||
Added = table.Column<DateTime>(nullable: false), | |||||
Content = table.Column<string>(nullable: true), | |||||
Group = table.Column<string>(nullable: true), | |||||
KeyName = table.Column<string>(nullable: true), | |||||
LastRun = table.Column<DateTime>(nullable: true), | |||||
Retries = table.Column<int>(nullable: false), | |||||
StatusName = table.Column<string>(maxLength: 50, nullable: false) | |||||
}, | |||||
constraints: table => | |||||
{ | |||||
table.PrimaryKey("PK_CapReceivedMessages", x => x.Id); | |||||
}); | |||||
migrationBuilder.CreateTable( | |||||
name: "CapSentMessages", | |||||
schema: "cap", | |||||
columns: table => new | |||||
{ | |||||
Id = table.Column<string>(nullable: false), | |||||
Added = table.Column<DateTime>(nullable: false), | |||||
Content = table.Column<string>(nullable: true), | |||||
KeyName = table.Column<string>(nullable: true), | |||||
LastRun = table.Column<DateTime>(nullable: true), | |||||
Retries = table.Column<int>(nullable: false), | |||||
StatusName = table.Column<string>(maxLength: 50, nullable: false) | |||||
}, | |||||
constraints: table => | |||||
{ | |||||
table.PrimaryKey("PK_CapSentMessages", x => x.Id); | |||||
}); | |||||
migrationBuilder.CreateIndex( | |||||
name: "IX_CapReceivedMessages_StatusName", | |||||
schema: "cap", | |||||
table: "CapReceivedMessages", | |||||
column: "StatusName"); | |||||
migrationBuilder.CreateIndex( | |||||
name: "IX_CapSentMessages_StatusName", | |||||
schema: "cap", | |||||
table: "CapSentMessages", | |||||
column: "StatusName"); | |||||
} | |||||
protected override void Down(MigrationBuilder migrationBuilder) | |||||
{ | |||||
migrationBuilder.DropTable( | |||||
name: "CapQueue", | |||||
schema: "cap"); | |||||
migrationBuilder.DropTable( | |||||
name: "CapReceivedMessages", | |||||
schema: "cap"); | |||||
migrationBuilder.DropTable( | |||||
name: "CapSentMessages", | |||||
schema: "cap"); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,89 @@ | |||||
using System; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||||
using Microsoft.EntityFrameworkCore.Metadata; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
using DotNetCore.CAP.EntityFrameworkCore; | |||||
namespace DotNetCore.CAP.EntityFrameworkCore.Migrations | |||||
{ | |||||
[DbContext(typeof(CapDbContext))] | |||||
partial class CapDbContextModelSnapshot : ModelSnapshot | |||||
{ | |||||
protected override void BuildModel(ModelBuilder modelBuilder) | |||||
{ | |||||
modelBuilder | |||||
.HasDefaultSchema("cap") | |||||
.HasAnnotation("ProductVersion", "1.1.2") | |||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b => | |||||
{ | |||||
b.Property<int>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<string>("MessageId"); | |||||
b.Property<int>("Type"); | |||||
b.HasKey("Id"); | |||||
b.ToTable("CapQueue"); | |||||
}); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b => | |||||
{ | |||||
b.Property<string>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<DateTime>("Added"); | |||||
b.Property<string>("Content"); | |||||
b.Property<string>("Group"); | |||||
b.Property<string>("KeyName"); | |||||
b.Property<DateTime?>("LastRun"); | |||||
b.Property<int>("Retries"); | |||||
b.Property<string>("StatusName") | |||||
.IsRequired() | |||||
.HasMaxLength(50); | |||||
b.HasKey("Id"); | |||||
b.HasIndex("StatusName"); | |||||
b.ToTable("CapReceivedMessages"); | |||||
}); | |||||
modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b => | |||||
{ | |||||
b.Property<string>("Id") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<DateTime>("Added"); | |||||
b.Property<string>("Content"); | |||||
b.Property<string>("KeyName"); | |||||
b.Property<DateTime?>("LastRun"); | |||||
b.Property<int>("Retries"); | |||||
b.Property<string>("StatusName") | |||||
.IsRequired() | |||||
.HasMaxLength(50); | |||||
b.HasKey("Id"); | |||||
b.HasIndex("StatusName"); | |||||
b.ToTable("CapSentMessages"); | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -78,33 +78,34 @@ namespace DotNetCore.CAP.Kafka | |||||
private async Task<bool> Step(ProcessingContext context) | private async Task<bool> Step(ProcessingContext context) | ||||
{ | { | ||||
using (var scopedContext = context.CreateScope()) | |||||
{ | |||||
var provider = scopedContext.Provider; | |||||
var messageStore = provider.GetRequiredService<ICapMessageStore>(); | |||||
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); | |||||
if (message == null) return true; | |||||
try | |||||
{ | |||||
var sp = Stopwatch.StartNew(); | |||||
message.StatusName = StatusName.Processing; | |||||
await messageStore.UpdateSentMessageAsync(message); | |||||
await ExecuteJobAsync(message.KeyName, message.Content); | |||||
sp.Stop(); | |||||
message.StatusName = StatusName.Succeeded; | |||||
await messageStore.UpdateSentMessageAsync(message); | |||||
_logger.JobExecuted(sp.Elapsed.TotalSeconds); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); | |||||
return false; | |||||
} | |||||
} | |||||
return true; | |||||
throw new NotImplementedException(); | |||||
// using (var scopedContext = context.CreateScope()) | |||||
// { | |||||
// var provider = scopedContext.Provider; | |||||
// var messageStore = provider.GetRequiredService<ICapMessageStore>(); | |||||
// var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); | |||||
// if (message == null) return true; | |||||
// try | |||||
// { | |||||
// var sp = Stopwatch.StartNew(); | |||||
// message.StatusName = StatusName.Processing; | |||||
// await messageStore.UpdateSentMessageAsync(message); | |||||
// await ExecuteJobAsync(message.KeyName, message.Content); | |||||
// sp.Stop(); | |||||
// message.StatusName = StatusName.Succeeded; | |||||
// await messageStore.UpdateSentMessageAsync(message); | |||||
// _logger.JobExecuted(sp.Elapsed.TotalSeconds); | |||||
// } | |||||
// catch (Exception ex) | |||||
// { | |||||
// _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); | |||||
// return false; | |||||
// } | |||||
// } | |||||
// return true; | |||||
} | } | ||||
private Task ExecuteJobAsync(string topic, string content) | private Task ExecuteJobAsync(string topic, string content) | ||||
@@ -23,7 +23,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private readonly ILogger _logger; | private readonly ILogger _logger; | ||||
private readonly TimeSpan _pollingDelay; | private readonly TimeSpan _pollingDelay; | ||||
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | |||||
public RabbitJobProcessor( | public RabbitJobProcessor( | ||||
IOptions<CapOptions> capOptions, | IOptions<CapOptions> capOptions, | ||||
@@ -67,7 +66,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
var token = GetTokenToWaitOn(context); | var token = GetTokenToWaitOn(context); | ||||
} | } | ||||
await WaitHandleEx.WaitAnyAsync(PulseEvent, | |||||
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.SentPulseEvent, | |||||
context.CancellationToken.WaitHandle, _pollingDelay); | context.CancellationToken.WaitHandle, _pollingDelay); | ||||
} | } | ||||
finally | finally | ||||
@@ -87,7 +86,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
using (var scopedContext = context.CreateScope()) | using (var scopedContext = context.CreateScope()) | ||||
{ | { | ||||
var provider = scopedContext.Provider; | var provider = scopedContext.Provider; | ||||
var messageStore = provider.GetRequiredService<ICapMessageStore>(); | |||||
//var messageStore = provider.GetRequiredService<ICapMessageStore>(); | |||||
var connection = provider.GetRequiredService<IStorageConnection>(); | var connection = provider.GetRequiredService<IStorageConnection>(); | ||||
if ((fetched = await connection.FetchNextSentMessageAsync()) != null) | if ((fetched = await connection.FetchNextSentMessageAsync()) != null) | ||||
@@ -106,7 +106,7 @@ namespace DotNetCore.CAP | |||||
{ | { | ||||
var receviedMessage = StoreMessage(scope, message); | var receviedMessage = StoreMessage(scope, message); | ||||
client.Commit(); | client.Commit(); | ||||
ProcessMessage(scope, receviedMessage); | |||||
// ProcessMessage(scope, receviedMessage); | |||||
} | } | ||||
}; | }; | ||||
} | } | ||||
@@ -123,36 +123,36 @@ namespace DotNetCore.CAP | |||||
return receivedMessage; | return receivedMessage; | ||||
} | } | ||||
private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) | |||||
{ | |||||
var provider = serviceScope.ServiceProvider; | |||||
var messageStore = provider.GetRequiredService<IStorageConnection>(); | |||||
try | |||||
{ | |||||
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); | |||||
//private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) | |||||
//{ | |||||
// var provider = serviceScope.ServiceProvider; | |||||
// var messageStore = provider.GetRequiredService<IStorageConnection>(); | |||||
// try | |||||
// { | |||||
// var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); | |||||
if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) | |||||
{ | |||||
messageStore.FetchNextReceivedMessageAsync | |||||
// if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) | |||||
// { | |||||
// messageStore.FetchNextReceivedMessageAsync | |||||
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); | |||||
// messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); | |||||
// If there are multiple consumers in the same group, we will take the first | |||||
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; | |||||
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); | |||||
// // If there are multiple consumers in the same group, we will take the first | |||||
// var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; | |||||
// var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); | |||||
_consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); | |||||
// _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); | |||||
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); | |||||
} | |||||
} | |||||
// messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); | |||||
// } | |||||
// } | |||||
// catch (Exception ex) | |||||
// { | |||||
// _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); | |||||
// } | |||||
//} | |||||
} | } |
@@ -7,6 +7,8 @@ namespace DotNetCore.CAP | |||||
/// </summary> | /// </summary> | ||||
public interface IProcessingServer : IDisposable | public interface IProcessingServer : IDisposable | ||||
{ | { | ||||
void Pulse(); | |||||
void Start(); | void Start(); | ||||
} | } | ||||
} | } |
@@ -7,6 +7,9 @@ namespace DotNetCore.CAP.Infrastructure | |||||
public static class WaitHandleEx | public static class WaitHandleEx | ||||
{ | { | ||||
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | ||||
public static readonly AutoResetEvent QueuePulseEvent = new AutoResetEvent(true); | |||||
public static readonly AutoResetEvent SentPulseEvent = new AutoResetEvent(true); | |||||
public static readonly AutoResetEvent ReceviedPulseEvent = new AutoResetEvent(true); | |||||
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) | public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) | ||||
{ | { | ||||
@@ -1,57 +0,0 @@ | |||||
using System; | |||||
using NCrontab; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public class ComputedCronJob | |||||
{ | |||||
private readonly CronJobRegistry.Entry _entry; | |||||
public ComputedCronJob() | |||||
{ | |||||
} | |||||
public ComputedCronJob(CronJob job) | |||||
{ | |||||
Job = job; | |||||
Schedule = CrontabSchedule.Parse(job.Cron); | |||||
if (job.TypeName != null) | |||||
{ | |||||
JobType = Type.GetType(job.TypeName); | |||||
} | |||||
} | |||||
public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry) | |||||
: this(job) | |||||
{ | |||||
_entry = entry; | |||||
} | |||||
public CronJob Job { get; set; } | |||||
public CrontabSchedule Schedule { get; set; } | |||||
public Type JobType { get; set; } | |||||
public DateTime Next { get; set; } | |||||
public int Retries { get; set; } | |||||
public DateTime FirstTry { get; set; } | |||||
public RetryBehavior RetryBehavior => _entry.RetryBehavior; | |||||
public void Update(DateTime baseTime) | |||||
{ | |||||
Job.LastRun = baseTime; | |||||
} | |||||
public void UpdateNext(DateTime now) | |||||
{ | |||||
var next = Schedule.GetNextOccurrence(now); | |||||
var previousNext = Schedule.GetNextOccurrence(Job.LastRun); | |||||
Next = next > previousNext ? now : next; | |||||
} | |||||
} | |||||
} |
@@ -1,198 +0,0 @@ | |||||
using System; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public class Cron | |||||
{ | |||||
/// <summary> | |||||
/// Returns cron expression that fires every minute. | |||||
/// </summary> | |||||
public static string Minutely() | |||||
{ | |||||
return "* * * * *"; | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every hour at the first minute. | |||||
/// </summary> | |||||
public static string Hourly() | |||||
{ | |||||
return Hourly(minute: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every hour at the specified minute. | |||||
/// </summary> | |||||
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> | |||||
public static string Hourly(int minute) | |||||
{ | |||||
return string.Format("{0} * * * *", minute); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every day at 00:00 UTC. | |||||
/// </summary> | |||||
public static string Daily() | |||||
{ | |||||
return Daily(hour: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every day at the first minute of | |||||
/// the specified hour in UTC. | |||||
/// </summary> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
public static string Daily(int hour) | |||||
{ | |||||
return Daily(hour, minute: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every day at the specified hour and minute | |||||
/// in UTC. | |||||
/// </summary> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> | |||||
public static string Daily(int hour, int minute) | |||||
{ | |||||
return string.Format("{0} {1} * * *", minute, hour); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every week at Monday, 00:00 UTC. | |||||
/// </summary> | |||||
public static string Weekly() | |||||
{ | |||||
return Weekly(DayOfWeek.Monday); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every week at 00:00 UTC of the specified | |||||
/// day of the week. | |||||
/// </summary> | |||||
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param> | |||||
public static string Weekly(DayOfWeek dayOfWeek) | |||||
{ | |||||
return Weekly(dayOfWeek, hour: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every week at the first minute | |||||
/// of the specified day of week and hour in UTC. | |||||
/// </summary> | |||||
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
public static string Weekly(DayOfWeek dayOfWeek, int hour) | |||||
{ | |||||
return Weekly(dayOfWeek, hour, minute: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every week at the specified day | |||||
/// of week, hour and minute in UTC. | |||||
/// </summary> | |||||
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> | |||||
public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute) | |||||
{ | |||||
return string.Format("{0} {1} * * {2}", minute, hour, (int) dayOfWeek); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every month at 00:00 UTC of the first | |||||
/// day of month. | |||||
/// </summary> | |||||
public static string Monthly() | |||||
{ | |||||
return Monthly(day: 1); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every month at 00:00 UTC of the specified | |||||
/// day of month. | |||||
/// </summary> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
public static string Monthly(int day) | |||||
{ | |||||
return Monthly(day, hour: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every month at the first minute of the | |||||
/// specified day of month and hour in UTC. | |||||
/// </summary> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
public static string Monthly(int day, int hour) | |||||
{ | |||||
return Monthly(day, hour, minute: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every month at the specified day of month, | |||||
/// hour and minute in UTC. | |||||
/// </summary> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> | |||||
public static string Monthly(int day, int hour, int minute) | |||||
{ | |||||
return string.Format("{0} {1} {2} * *", minute, hour, day); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every year on Jan, 1st at 00:00 UTC. | |||||
/// </summary> | |||||
public static string Yearly() | |||||
{ | |||||
return Yearly(month: 1); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every year in the first day at 00:00 UTC | |||||
/// of the specified month. | |||||
/// </summary> | |||||
/// <param name="month">The month in which the schedule will be activated (1-12).</param> | |||||
public static string Yearly(int month) | |||||
{ | |||||
return Yearly(month, day: 1); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every year at 00:00 UTC of the specified | |||||
/// month and day of month. | |||||
/// </summary> | |||||
/// <param name="month">The month in which the schedule will be activated (1-12).</param> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
public static string Yearly(int month, int day) | |||||
{ | |||||
return Yearly(month, day, hour: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every year at the first minute of the | |||||
/// specified month, day and hour in UTC. | |||||
/// </summary> | |||||
/// <param name="month">The month in which the schedule will be activated (1-12).</param> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
public static string Yearly(int month, int day, int hour) | |||||
{ | |||||
return Yearly(month, day, hour, minute: 0); | |||||
} | |||||
/// <summary> | |||||
/// Returns cron expression that fires every year at the specified month, day, | |||||
/// hour and minute in UTC. | |||||
/// </summary> | |||||
/// <param name="month">The month in which the schedule will be activated (1-12).</param> | |||||
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param> | |||||
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param> | |||||
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param> | |||||
public static string Yearly(int month, int day, int hour, int minute) | |||||
{ | |||||
return $"{minute} {hour} {day} {month} *"; | |||||
} | |||||
} | |||||
} |
@@ -1,37 +0,0 @@ | |||||
using System; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
/// <summary> | |||||
/// Represents a cron job to be executed at specified intervals of time. | |||||
/// </summary> | |||||
public class CronJob | |||||
{ | |||||
public CronJob() | |||||
{ | |||||
Id = Guid.NewGuid().ToString(); | |||||
} | |||||
public CronJob(string cron) | |||||
: this() | |||||
{ | |||||
Cron = cron; | |||||
} | |||||
public CronJob(string cron, DateTime lastRun) | |||||
: this(cron) | |||||
{ | |||||
LastRun = lastRun; | |||||
} | |||||
public string Id { get; set; } | |||||
public string Name { get; set; } | |||||
public string TypeName { get; set; } | |||||
public string Cron { get; set; } | |||||
public DateTime LastRun { get; set; } | |||||
} | |||||
} |
@@ -1,15 +0,0 @@ | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using Microsoft.Extensions.Options; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public class DefaultCronJobRegistry : CronJobRegistry | |||||
{ | |||||
public DefaultCronJobRegistry(IOptions<CapOptions> options) | |||||
{ | |||||
var options1 = options.Value; | |||||
RegisterJob<CapJob>(nameof(DefaultCronJobRegistry), options1.CronExp, RetryBehavior.DefaultRetry); | |||||
} | |||||
} | |||||
} |
@@ -1,68 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Reflection; | |||||
using NCrontab; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public abstract class CronJobRegistry | |||||
{ | |||||
private readonly List<Entry> _entries; | |||||
protected CronJobRegistry() | |||||
{ | |||||
_entries = new List<Entry>(); | |||||
} | |||||
protected void RegisterJob<T>(string name, string cron, RetryBehavior retryBehavior = null) | |||||
where T : IJob | |||||
{ | |||||
RegisterJob(name, typeof(T), cron, retryBehavior); | |||||
} | |||||
/// <summary> | |||||
/// Registers a cron job. | |||||
/// </summary> | |||||
/// <param name="name">The name of the job.</param> | |||||
/// <param name="jobType">The job's type.</param> | |||||
/// <param name="cron">The cron expression to use.</param> | |||||
/// <param name="retryBehavior">The <see cref="RetryBehavior"/> to use.</param> | |||||
protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) | |||||
{ | |||||
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron)); | |||||
if (jobType == null) throw new ArgumentNullException(nameof(jobType)); | |||||
if (cron == null) throw new ArgumentNullException(nameof(cron)); | |||||
retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry; | |||||
CrontabSchedule.TryParse(cron); | |||||
if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) | |||||
{ | |||||
throw new ArgumentException( | |||||
"Cron jobs should extend IJob.", nameof(jobType)); | |||||
} | |||||
_entries.Add(new Entry(name, jobType, cron)); | |||||
} | |||||
public Entry[] Build() => _entries.ToArray(); | |||||
public class Entry | |||||
{ | |||||
public Entry(string name, Type jobType, string cron) | |||||
{ | |||||
Name = name; | |||||
JobType = jobType; | |||||
Cron = cron; | |||||
} | |||||
public string Name { get; set; } | |||||
public Type JobType { get; set; } | |||||
public string Cron { get; set; } | |||||
public RetryBehavior RetryBehavior { get; set; } | |||||
} | |||||
} | |||||
} |
@@ -32,32 +32,32 @@ namespace DotNetCore.CAP.Job | |||||
public async Task ExecuteAsync() | public async Task ExecuteAsync() | ||||
{ | { | ||||
var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); | |||||
using (var scope = _serviceProvider.CreateScope()) | |||||
{ | |||||
var provider = scope.ServiceProvider; | |||||
//var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); | |||||
//using (var scope = _serviceProvider.CreateScope()) | |||||
//{ | |||||
// var provider = scope.ServiceProvider; | |||||
var messageStore = provider.GetService<ICapMessageStore>(); | |||||
var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); | |||||
if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group)) | |||||
{ | |||||
try | |||||
{ | |||||
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); | |||||
// If there are multiple consumers in the same group, we will take the first | |||||
var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0]; | |||||
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext()); | |||||
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); | |||||
await invoker.InvokeAsync(); | |||||
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); | |||||
// var messageStore = provider.GetService<ICapMessageStore>(); | |||||
// var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); | |||||
// if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group)) | |||||
// { | |||||
// try | |||||
// { | |||||
// await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); | |||||
// // If there are multiple consumers in the same group, we will take the first | |||||
// var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0]; | |||||
// var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext()); | |||||
// var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); | |||||
// await invoker.InvokeAsync(); | |||||
// await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
_logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); | |||||
} | |||||
} | |||||
} | |||||
// } | |||||
// catch (Exception ex) | |||||
// { | |||||
// _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); | |||||
// } | |||||
// } | |||||
//} | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,170 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public class CronJobProcessor : IJobProcessor | |||||
{ | |||||
private readonly ILogger _logger; | |||||
private IServiceProvider _provider; | |||||
private readonly DefaultCronJobRegistry _jobRegistry; | |||||
public CronJobProcessor( | |||||
DefaultCronJobRegistry jobRegistry, | |||||
ILogger<CronJobProcessor> logger, | |||||
IServiceProvider provider) | |||||
{ | |||||
_jobRegistry = jobRegistry; | |||||
_logger = logger; | |||||
_provider = provider; | |||||
} | |||||
public override string ToString() => nameof(CronJobProcessor); | |||||
public Task ProcessAsync(ProcessingContext context) | |||||
{ | |||||
if (context == null) throw new ArgumentNullException(nameof(context)); | |||||
return ProcessCoreAsync(context); | |||||
} | |||||
private async Task ProcessCoreAsync(ProcessingContext context) | |||||
{ | |||||
//var storage = context.Storage; | |||||
//var jobs = await GetJobsAsync(storage); | |||||
var jobs = GetJobs(); | |||||
if (!jobs.Any()) | |||||
{ | |||||
_logger.CronJobsNotFound(); | |||||
// This will cancel this processor. | |||||
throw new OperationCanceledException(); | |||||
} | |||||
_logger.CronJobsScheduling(jobs); | |||||
context.ThrowIfStopping(); | |||||
var computedJobs = Compute(jobs, context.CronJobRegistry.Build()); | |||||
if (context.IsStopping) | |||||
{ | |||||
return; | |||||
} | |||||
await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context))); | |||||
} | |||||
private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) | |||||
{ | |||||
//var storage = context.Storage; | |||||
var retryBehavior = computedJob.RetryBehavior; | |||||
while (!context.IsStopping) | |||||
{ | |||||
var now = DateTime.UtcNow; | |||||
var due = ComputeDue(computedJob, now); | |||||
var timeSpan = due - now; | |||||
if (timeSpan.TotalSeconds > 0) | |||||
{ | |||||
await context.WaitAsync(timeSpan); | |||||
} | |||||
context.ThrowIfStopping(); | |||||
using (var scopedContext = context.CreateScope()) | |||||
{ | |||||
var provider = scopedContext.Provider; | |||||
var job = provider.GetService<IJob>(); | |||||
var success = true; | |||||
try | |||||
{ | |||||
var sw = Stopwatch.StartNew(); | |||||
await job.ExecuteAsync(); | |||||
sw.Stop(); | |||||
computedJob.Retries = 0; | |||||
_logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
success = false; | |||||
if (computedJob.Retries == 0) | |||||
{ | |||||
computedJob.FirstTry = DateTime.UtcNow; | |||||
} | |||||
computedJob.Retries++; | |||||
_logger.CronJobFailed(computedJob.Job.Name, ex); | |||||
} | |||||
if (success) | |||||
{ | |||||
computedJob.Update(DateTime.UtcNow); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) | |||||
{ | |||||
computedJob.UpdateNext(now); | |||||
var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry; | |||||
var retries = computedJob.Retries; | |||||
if (retries == 0) | |||||
{ | |||||
return computedJob.Next; | |||||
} | |||||
var realNext = computedJob.Schedule.GetNextOccurrence(now); | |||||
if (!retryBehavior.Retry) | |||||
{ | |||||
// No retry. If job failed before, we don't care, just schedule it next as usual. | |||||
return realNext; | |||||
} | |||||
if (retries >= retryBehavior.RetryCount) | |||||
{ | |||||
// Max retries. Just schedule it for the next occurance. | |||||
return realNext; | |||||
} | |||||
// Delay a bit. | |||||
return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries)); | |||||
} | |||||
private CronJob[] GetJobs() | |||||
{ | |||||
var cronJobs = new List<CronJob>(); | |||||
var entries = _jobRegistry.Build() ?? new CronJobRegistry.Entry[0]; | |||||
foreach (var entry in entries) | |||||
{ | |||||
cronJobs.Add(new CronJob | |||||
{ | |||||
Name = entry.Name, | |||||
TypeName = entry.JobType.AssemblyQualifiedName, | |||||
Cron = entry.Cron, | |||||
LastRun = DateTime.MinValue | |||||
}); | |||||
} | |||||
return cronJobs.ToArray(); | |||||
} | |||||
private ComputedCronJob[] Compute(IEnumerable<CronJob> jobs, CronJobRegistry.Entry[] entries) | |||||
=> jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray(); | |||||
private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) | |||||
{ | |||||
var entry = entries.First(e => e.Name == job.Name); | |||||
return new ComputedCronJob(job, entry); | |||||
} | |||||
} | |||||
} |
@@ -1,31 +1,31 @@ | |||||
using System; | using System; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Job.States; | using DotNetCore.CAP.Job.States; | ||||
using DotNetCore.CAP.Models; | using DotNetCore.CAP.Models; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using Microsoft.Extensions.Options; | |||||
namespace DotNetCore.CAP.Job | namespace DotNetCore.CAP.Job | ||||
{ | { | ||||
public class JobQueuer : IJobProcessor | public class JobQueuer : IJobProcessor | ||||
{ | { | ||||
private ILogger _logger; | private ILogger _logger; | ||||
private JobsOptions _options; | |||||
private CapOptions _options; | |||||
private IStateChanger _stateChanger; | private IStateChanger _stateChanger; | ||||
private IServiceProvider _provider; | private IServiceProvider _provider; | ||||
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | |||||
private TimeSpan _pollingDelay; | private TimeSpan _pollingDelay; | ||||
public JobQueuer( | public JobQueuer( | ||||
ILogger<JobQueuer> logger, | ILogger<JobQueuer> logger, | ||||
JobsOptions options, | |||||
IOptions<CapOptions> options, | |||||
IStateChanger stateChanger, | IStateChanger stateChanger, | ||||
IServiceProvider provider) | IServiceProvider provider) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_options = options; | |||||
_options = options.Value; | |||||
_stateChanger = stateChanger; | _stateChanger = stateChanger; | ||||
_provider = provider; | _provider = provider; | ||||
@@ -37,7 +37,7 @@ namespace DotNetCore.CAP.Job | |||||
using (var scope = _provider.CreateScope()) | using (var scope = _provider.CreateScope()) | ||||
{ | { | ||||
CapSentMessage sentMessage; | CapSentMessage sentMessage; | ||||
CapReceivedMessage receivedMessage; | |||||
// CapReceivedMessage receivedMessage; | |||||
var provider = scope.ServiceProvider; | var provider = scope.ServiceProvider; | ||||
var connection = provider.GetRequiredService<IStorageConnection>(); | var connection = provider.GetRequiredService<IStorageConnection>(); | ||||
@@ -57,9 +57,10 @@ namespace DotNetCore.CAP.Job | |||||
} | } | ||||
context.ThrowIfStopping(); | context.ThrowIfStopping(); | ||||
DelayedJobProcessor.PulseEvent.Set(); | |||||
await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); | |||||
WaitHandleEx.SentPulseEvent.Set(); | |||||
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent, | |||||
context.CancellationToken.WaitHandle, _pollingDelay); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,11 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace DotNetCore.CAP.Job | |||||
{ | |||||
public interface IMessageJobProcessor : IJobProcessor | |||||
{ | |||||
bool Waiting { get; } | |||||
} | |||||
} |
@@ -17,7 +17,6 @@ namespace DotNetCore.CAP.Job | |||||
private readonly IServiceProvider _provider; | private readonly IServiceProvider _provider; | ||||
private readonly CancellationTokenSource _cts; | private readonly CancellationTokenSource _cts; | ||||
private readonly CapOptions _options; | private readonly CapOptions _options; | ||||
private readonly DefaultCronJobRegistry _defaultJobRegistry; | |||||
private IJobProcessor[] _processors; | private IJobProcessor[] _processors; | ||||
private ProcessingContext _context; | private ProcessingContext _context; | ||||
@@ -28,13 +27,11 @@ namespace DotNetCore.CAP.Job | |||||
ILogger<JobProcessingServer> logger, | ILogger<JobProcessingServer> logger, | ||||
ILoggerFactory loggerFactory, | ILoggerFactory loggerFactory, | ||||
IServiceProvider provider, | IServiceProvider provider, | ||||
DefaultCronJobRegistry defaultJobRegistry, | |||||
IOptions<CapOptions> options) | IOptions<CapOptions> options) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_loggerFactory = loggerFactory; | _loggerFactory = loggerFactory; | ||||
_provider = provider; | _provider = provider; | ||||
_defaultJobRegistry = defaultJobRegistry; | |||||
_options = options.Value; | _options = options.Value; | ||||
_cts = new CancellationTokenSource(); | _cts = new CancellationTokenSource(); | ||||
} | } | ||||
@@ -46,10 +43,7 @@ namespace DotNetCore.CAP.Job | |||||
_processors = GetProcessors(processorCount); | _processors = GetProcessors(processorCount); | ||||
_logger.ServerStarting(processorCount, processorCount); | _logger.ServerStarting(processorCount, processorCount); | ||||
_context = new ProcessingContext( | |||||
_provider, | |||||
_defaultJobRegistry, | |||||
_cts.Token); | |||||
_context = new ProcessingContext(_provider, _cts.Token); | |||||
var processorTasks = _processors | var processorTasks = _processors | ||||
.Select(InfiniteRetry) | .Select(InfiniteRetry) | ||||
@@ -57,6 +51,31 @@ namespace DotNetCore.CAP.Job | |||||
_compositeTask = Task.WhenAll(processorTasks); | _compositeTask = Task.WhenAll(processorTasks); | ||||
} | } | ||||
public void Pulse() | |||||
{ | |||||
if (!AllProcessorsWaiting()) | |||||
{ | |||||
// Some processor is still executing jobs so no need to pulse. | |||||
return; | |||||
} | |||||
_logger.LogTrace("Pulsing the JobQueuer."); | |||||
WaitHandleEx.QueuePulseEvent.Set(); | |||||
} | |||||
private bool AllProcessorsWaiting() | |||||
{ | |||||
foreach (var processor in _processors) | |||||
{ | |||||
if (!processor.Waiting) | |||||
{ | |||||
return false; | |||||
} | |||||
} | |||||
return true; | |||||
} | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
if (_disposed) | if (_disposed) | ||||
@@ -69,7 +88,7 @@ namespace DotNetCore.CAP.Job | |||||
_cts.Cancel(); | _cts.Cancel(); | ||||
try | try | ||||
{ | { | ||||
_compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds); | |||||
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); | |||||
} | } | ||||
catch (AggregateException ex) | catch (AggregateException ex) | ||||
{ | { | ||||
@@ -16,24 +16,19 @@ namespace DotNetCore.CAP.Job | |||||
private ProcessingContext(ProcessingContext other) | private ProcessingContext(ProcessingContext other) | ||||
{ | { | ||||
Provider = other.Provider; | Provider = other.Provider; | ||||
CronJobRegistry = other.CronJobRegistry; | |||||
CancellationToken = other.CancellationToken; | CancellationToken = other.CancellationToken; | ||||
} | } | ||||
public ProcessingContext( | public ProcessingContext( | ||||
IServiceProvider provider, | IServiceProvider provider, | ||||
CronJobRegistry cronJobRegistry, | |||||
CancellationToken cancellationToken) | CancellationToken cancellationToken) | ||||
{ | { | ||||
Provider = provider; | Provider = provider; | ||||
CronJobRegistry = cronJobRegistry; | |||||
CancellationToken = cancellationToken; | CancellationToken = cancellationToken; | ||||
} | } | ||||
public IServiceProvider Provider { get; private set; } | public IServiceProvider Provider { get; private set; } | ||||
public CronJobRegistry CronJobRegistry { get; private set; } | |||||
public CancellationToken CancellationToken { get; } | public CancellationToken CancellationToken { get; } | ||||
public bool IsStopping => CancellationToken.IsCancellationRequested; | public bool IsStopping => CancellationToken.IsCancellationRequested; | ||||
@@ -1,7 +1,6 @@ | |||||
using System; | using System; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | using DotNetCore.CAP.Models; | ||||
namespace DotNetCore.CAP.Test | namespace DotNetCore.CAP.Test | ||||
@@ -14,45 +13,9 @@ namespace DotNetCore.CAP.Test | |||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
public Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName, | |||||
bool autoSaveChanges = true) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted() | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync() | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message) | public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message) | ||||
{ | { | ||||
throw new NotImplementedException(); | throw new NotImplementedException(); | ||||
} | } | ||||
public Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
public Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message) | |||||
{ | |||||
throw new NotImplementedException(); | |||||
} | |||||
} | } | ||||
} | } |
@@ -66,30 +66,30 @@ namespace DotNetCore.CAP.Test | |||||
Assert.NotNull(operateResult); | Assert.NotNull(operateResult); | ||||
Assert.True(operateResult.Succeeded); | Assert.True(operateResult.Succeeded); | ||||
operateResult = await manager.RemoveSentMessageAsync(message); | |||||
Assert.NotNull(operateResult); | |||||
Assert.True(operateResult.Succeeded); | |||||
// operateResult = await manager.RemoveSentMessageAsync(message); | |||||
// Assert.NotNull(operateResult); | |||||
// Assert.True(operateResult.Succeeded); | |||||
} | } | ||||
[Fact] | |||||
public async Task CanUpdateReceivedMessage() | |||||
{ | |||||
if (ShouldSkipDbTests()) | |||||
{ | |||||
return; | |||||
} | |||||
var manager = CreateManager(); | |||||
var message = CreateTestReceivedMessage(); | |||||
var operateResult = await manager.StoreReceivedMessageAsync(message); | |||||
Assert.NotNull(operateResult); | |||||
Assert.True(operateResult.Succeeded); | |||||
message.StatusName = StatusName.Processing; | |||||
operateResult = await manager.UpdateReceivedMessageAsync(message); | |||||
Assert.NotNull(operateResult); | |||||
Assert.True(operateResult.Succeeded); | |||||
} | |||||
//[Fact] | |||||
//public async Task CanUpdateReceivedMessage() | |||||
//{ | |||||
// if (ShouldSkipDbTests()) | |||||
// { | |||||
// return; | |||||
// } | |||||
// var manager = CreateManager(); | |||||
// var message = CreateTestReceivedMessage(); | |||||
// // var operateResult = await manager.StoreReceivedMessageAsync(message); | |||||
// // Assert.NotNull(operateResult); | |||||
// // Assert.True(operateResult.Succeeded); | |||||
// // message.StatusName = StatusName.Processing; | |||||
// // operateResult = await manager.UpdateReceivedMessageAsync(message); | |||||
// // Assert.NotNull(operateResult); | |||||
// // Assert.True(operateResult.Succeeded); | |||||
//} | |||||
[Fact] | [Fact] | ||||
public async Task CanGetNextSendMessage() | public async Task CanGetNextSendMessage() | ||||
@@ -105,9 +105,9 @@ namespace DotNetCore.CAP.Test | |||||
Assert.NotNull(operateResult); | Assert.NotNull(operateResult); | ||||
Assert.True(operateResult.Succeeded); | Assert.True(operateResult.Succeeded); | ||||
var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync(); | |||||
// var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync(); | |||||
Assert.Equal(message, storeMessage); | |||||
// Assert.Equal(message, storeMessage); | |||||
} | } | ||||
} | } | ||||
} | } |