diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs index 1858212..aa2ff25 100644 --- a/samples/Sample.Kafka/Startup.cs +++ b/samples/Sample.Kafka/Startup.cs @@ -1,4 +1,5 @@ -using Microsoft.AspNetCore.Builder; +using DotNetCore.CAP.EntityFrameworkCore; +using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -23,10 +24,10 @@ namespace Sample.Kafka // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { - services.AddDbContext(); + services.AddDbContext(); services.AddCap() - .AddEntityFrameworkStores() + .AddEntityFrameworkStores() .AddRabbitMQ(x => { x.HostName = "localhost"; diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs index b019803..6e2b118 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs @@ -9,6 +9,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore public const string DefaultSchema = "cap"; public const string DefaultMigrationsHistoryTableName = "__EFMigrationsHistory"; + + public EFOptions() + { + ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; + } + /// /// Gets or sets the database's connection string that will be used to store database entities. /// diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs index b255f1d..85f0d2e 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs @@ -15,7 +15,9 @@ namespace DotNetCore.CAP.EntityFrameworkCore /// /// Initializes a new instance of the . /// - public CapDbContext() { } + public CapDbContext() { + _efOptions = new EFOptions(); + } /// /// Initializes a new instance of the . @@ -65,5 +67,10 @@ namespace DotNetCore.CAP.EntityFrameworkCore b.Property(p => p.StatusName).IsRequired().HasMaxLength(50); }); } + + //protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + //{ + // optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); + //} } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs new file mode 100644 index 0000000..ede5cab --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisherExtensions.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading.Tasks; +using DotNetCore.CAP.Models; +using Dapper; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using DotNetCore.CAP.Infrastructure; + +namespace DotNetCore.CAP +{ + static class CapPublisherExtensions + { + public static async Task Publish(this ICapPublisher publisher, string topic, string content, DatabaseFacade database) + { + var connection = database.GetDbConnection(); + var transaction = database.CurrentTransaction; + transaction = transaction ?? await database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + + var message = new CapSentMessage + { + KeyName = topic, + Content = content, + StatusName = StatusName.Enqueued + }; + + var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; + await connection.ExecuteAsync(sql, transaction); + + JobQueuer.PulseEvent.Set(); + + } + + public static async Task Publish(this ICapPublisher publisher, string topic, string content, IDbConnection connection,IDbTransaction transaction) + { + var message = new CapSentMessage + { + KeyName = topic, + Content = content, + StatusName = StatusName.Enqueued + }; + + var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[LastRun],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@LastRun,@Retries,@StatusName)"; + return await connection.ExecuteAsync(sql, transaction); + + JobQueuer.PulseEvent.Set(); + } + + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj b/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj index 6a6422c..d532199 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj +++ b/src/DotNetCore.CAP.EntityFrameworkCore/DotNetCore.CAP.EntityFrameworkCore.csproj @@ -14,6 +14,7 @@ + diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs index d0ed080..cc302f7 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs @@ -96,7 +96,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore throw new NotImplementedException(); } - public Task GetNextReceviedMessageToBeEnqueuedAsync() + public Task GetNextReceviedMessageToBeEnqueuedAsync() { throw new NotImplementedException(); } @@ -113,6 +113,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore public void Dispose() { - } + } } } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs deleted file mode 100644 index 02342d6..0000000 --- a/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Data; - -namespace DotNetCore.CAP.EntityFrameworkCore -{ - static class HelperExtensions - { - public static void Execute(this IDbConnection connection, string sql, IDbTransaction transcation = null) - { - try - { - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = "SELELCT 1"; - if (transcation != null) - command.Transaction = transcation; - command.ExecuteNonQuery(); - } - } - finally - { - connection.Close(); - } - } - - - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.Designer.cs b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.Designer.cs new file mode 100644 index 0000000..0f9da26 --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.Designer.cs @@ -0,0 +1,90 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using DotNetCore.CAP.EntityFrameworkCore; + +namespace DotNetCore.CAP.EntityFrameworkCore.Migrations +{ + [DbContext(typeof(CapDbContext))] + [Migration("20170711154104_InitializeDB")] + partial class InitializeDB + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { + modelBuilder + .HasDefaultSchema("cap") + .HasAnnotation("ProductVersion", "1.1.2") + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("MessageId"); + + b.Property("Type"); + + b.HasKey("Id"); + + b.ToTable("CapQueue"); + }); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Content"); + + b.Property("Group"); + + b.Property("KeyName"); + + b.Property("LastRun"); + + b.Property("Retries"); + + b.Property("StatusName") + .IsRequired() + .HasMaxLength(50); + + b.HasKey("Id"); + + b.HasIndex("StatusName"); + + b.ToTable("CapReceivedMessages"); + }); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Content"); + + b.Property("KeyName"); + + b.Property("LastRun"); + + b.Property("Retries"); + + b.Property("StatusName") + .IsRequired() + .HasMaxLength(50); + + b.HasKey("Id"); + + b.HasIndex("StatusName"); + + b.ToTable("CapSentMessages"); + }); + } + } +} diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.cs b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.cs new file mode 100644 index 0000000..dd96f24 --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/20170711154104_InitializeDB.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Metadata; + +namespace DotNetCore.CAP.EntityFrameworkCore.Migrations +{ + public partial class InitializeDB : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "cap"); + + migrationBuilder.CreateTable( + name: "CapQueue", + schema: "cap", + columns: table => new + { + Id = table.Column(nullable: false) + .Annotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn), + MessageId = table.Column(nullable: true), + Type = table.Column(nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_CapQueue", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "CapReceivedMessages", + schema: "cap", + columns: table => new + { + Id = table.Column(nullable: false), + Added = table.Column(nullable: false), + Content = table.Column(nullable: true), + Group = table.Column(nullable: true), + KeyName = table.Column(nullable: true), + LastRun = table.Column(nullable: true), + Retries = table.Column(nullable: false), + StatusName = table.Column(maxLength: 50, nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_CapReceivedMessages", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "CapSentMessages", + schema: "cap", + columns: table => new + { + Id = table.Column(nullable: false), + Added = table.Column(nullable: false), + Content = table.Column(nullable: true), + KeyName = table.Column(nullable: true), + LastRun = table.Column(nullable: true), + Retries = table.Column(nullable: false), + StatusName = table.Column(maxLength: 50, nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_CapSentMessages", x => x.Id); + }); + + migrationBuilder.CreateIndex( + name: "IX_CapReceivedMessages_StatusName", + schema: "cap", + table: "CapReceivedMessages", + column: "StatusName"); + + migrationBuilder.CreateIndex( + name: "IX_CapSentMessages_StatusName", + schema: "cap", + table: "CapSentMessages", + column: "StatusName"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "CapQueue", + schema: "cap"); + + migrationBuilder.DropTable( + name: "CapReceivedMessages", + schema: "cap"); + + migrationBuilder.DropTable( + name: "CapSentMessages", + schema: "cap"); + } + } +} diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/CapDbContextModelSnapshot.cs b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/CapDbContextModelSnapshot.cs new file mode 100644 index 0000000..765fa0f --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/Migrations/CapDbContextModelSnapshot.cs @@ -0,0 +1,89 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using DotNetCore.CAP.EntityFrameworkCore; + +namespace DotNetCore.CAP.EntityFrameworkCore.Migrations +{ + [DbContext(typeof(CapDbContext))] + partial class CapDbContextModelSnapshot : ModelSnapshot + { + protected override void BuildModel(ModelBuilder modelBuilder) + { + modelBuilder + .HasDefaultSchema("cap") + .HasAnnotation("ProductVersion", "1.1.2") + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("MessageId"); + + b.Property("Type"); + + b.HasKey("Id"); + + b.ToTable("CapQueue"); + }); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapReceivedMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Content"); + + b.Property("Group"); + + b.Property("KeyName"); + + b.Property("LastRun"); + + b.Property("Retries"); + + b.Property("StatusName") + .IsRequired() + .HasMaxLength(50); + + b.HasKey("Id"); + + b.HasIndex("StatusName"); + + b.ToTable("CapReceivedMessages"); + }); + + modelBuilder.Entity("DotNetCore.CAP.Models.CapSentMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Content"); + + b.Property("KeyName"); + + b.Property("LastRun"); + + b.Property("Retries"); + + b.Property("StatusName") + .IsRequired() + .HasMaxLength(50); + + b.HasKey("Id"); + + b.HasIndex("StatusName"); + + b.ToTable("CapSentMessages"); + }); + } + } +} diff --git a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs index 3edda96..678feda 100644 --- a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs +++ b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs @@ -78,33 +78,34 @@ namespace DotNetCore.CAP.Kafka private async Task Step(ProcessingContext context) { - using (var scopedContext = context.CreateScope()) - { - var provider = scopedContext.Provider; - var messageStore = provider.GetRequiredService(); - var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); - if (message == null) return true; - try - { - var sp = Stopwatch.StartNew(); - message.StatusName = StatusName.Processing; - await messageStore.UpdateSentMessageAsync(message); - - await ExecuteJobAsync(message.KeyName, message.Content); - - sp.Stop(); - - message.StatusName = StatusName.Succeeded; - await messageStore.UpdateSentMessageAsync(message); - _logger.JobExecuted(sp.Elapsed.TotalSeconds); - } - catch (Exception ex) - { - _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); - return false; - } - } - return true; + throw new NotImplementedException(); + // using (var scopedContext = context.CreateScope()) + // { + // var provider = scopedContext.Provider; + // var messageStore = provider.GetRequiredService(); + // var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); + // if (message == null) return true; + // try + // { + // var sp = Stopwatch.StartNew(); + // message.StatusName = StatusName.Processing; + // await messageStore.UpdateSentMessageAsync(message); + + // await ExecuteJobAsync(message.KeyName, message.Content); + + // sp.Stop(); + + // message.StatusName = StatusName.Succeeded; + // await messageStore.UpdateSentMessageAsync(message); + // _logger.JobExecuted(sp.Elapsed.TotalSeconds); + // } + // catch (Exception ex) + // { + // _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); + // return false; + // } + // } + // return true; } private Task ExecuteJobAsync(string topic, string content) diff --git a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs index f50a0a5..dd884fd 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs @@ -23,7 +23,6 @@ namespace DotNetCore.CAP.RabbitMQ private readonly ILogger _logger; private readonly TimeSpan _pollingDelay; - internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); public RabbitJobProcessor( IOptions capOptions, @@ -67,7 +66,7 @@ namespace DotNetCore.CAP.RabbitMQ var token = GetTokenToWaitOn(context); } - await WaitHandleEx.WaitAnyAsync(PulseEvent, + await WaitHandleEx.WaitAnyAsync(WaitHandleEx.SentPulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); } finally @@ -87,7 +86,7 @@ namespace DotNetCore.CAP.RabbitMQ using (var scopedContext = context.CreateScope()) { var provider = scopedContext.Provider; - var messageStore = provider.GetRequiredService(); + //var messageStore = provider.GetRequiredService(); var connection = provider.GetRequiredService(); if ((fetched = await connection.FetchNextSentMessageAsync()) != null) diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 6eb85ea..ff11500 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -106,7 +106,7 @@ namespace DotNetCore.CAP { var receviedMessage = StoreMessage(scope, message); client.Commit(); - ProcessMessage(scope, receviedMessage); + // ProcessMessage(scope, receviedMessage); } }; } @@ -123,36 +123,36 @@ namespace DotNetCore.CAP return receivedMessage; } - private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) - { - var provider = serviceScope.ServiceProvider; - var messageStore = provider.GetRequiredService(); - try - { - var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); + //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) + //{ + // var provider = serviceScope.ServiceProvider; + // var messageStore = provider.GetRequiredService(); + // try + // { + // var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); - if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) - { - messageStore.FetchNextReceivedMessageAsync + // if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) + // { + // messageStore.FetchNextReceivedMessageAsync - messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); + // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); - // If there are multiple consumers in the same group, we will take the first - var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; - var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); + // // If there are multiple consumers in the same group, we will take the first + // var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; + // var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); - _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); + // _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); - messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); - } - } - catch (Exception ex) - { - _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); - } - } + // messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); + // } + // } + // catch (Exception ex) + // { + // _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); + // } + //} } diff --git a/src/DotNetCore.CAP/IProcessingServer.cs b/src/DotNetCore.CAP/IProcessingServer.cs index ccb01f3..f5f3533 100644 --- a/src/DotNetCore.CAP/IProcessingServer.cs +++ b/src/DotNetCore.CAP/IProcessingServer.cs @@ -7,6 +7,8 @@ namespace DotNetCore.CAP /// public interface IProcessingServer : IDisposable { + void Pulse(); + void Start(); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs index 385959f..24cfbdf 100644 --- a/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs +++ b/src/DotNetCore.CAP/Infrastructure/WaitHandleEx.cs @@ -7,6 +7,9 @@ namespace DotNetCore.CAP.Infrastructure public static class WaitHandleEx { public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); + public static readonly AutoResetEvent QueuePulseEvent = new AutoResetEvent(true); + public static readonly AutoResetEvent SentPulseEvent = new AutoResetEvent(true); + public static readonly AutoResetEvent ReceviedPulseEvent = new AutoResetEvent(true); public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) { diff --git a/src/DotNetCore.CAP/Job/ComputedCronJob.cs b/src/DotNetCore.CAP/Job/ComputedCronJob.cs deleted file mode 100644 index dcfbe28..0000000 --- a/src/DotNetCore.CAP/Job/ComputedCronJob.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using NCrontab; - -namespace DotNetCore.CAP.Job -{ - public class ComputedCronJob - { - private readonly CronJobRegistry.Entry _entry; - - public ComputedCronJob() - { - } - - public ComputedCronJob(CronJob job) - { - Job = job; - - Schedule = CrontabSchedule.Parse(job.Cron); - if (job.TypeName != null) - { - JobType = Type.GetType(job.TypeName); - } - } - - public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry) - : this(job) - { - _entry = entry; - } - - public CronJob Job { get; set; } - - public CrontabSchedule Schedule { get; set; } - - public Type JobType { get; set; } - - public DateTime Next { get; set; } - - public int Retries { get; set; } - - public DateTime FirstTry { get; set; } - - public RetryBehavior RetryBehavior => _entry.RetryBehavior; - - public void Update(DateTime baseTime) - { - Job.LastRun = baseTime; - } - - public void UpdateNext(DateTime now) - { - var next = Schedule.GetNextOccurrence(now); - var previousNext = Schedule.GetNextOccurrence(Job.LastRun); - Next = next > previousNext ? now : next; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/Cron.cs b/src/DotNetCore.CAP/Job/Cron.cs deleted file mode 100644 index 9899f41..0000000 --- a/src/DotNetCore.CAP/Job/Cron.cs +++ /dev/null @@ -1,198 +0,0 @@ -using System; - -namespace DotNetCore.CAP.Job -{ - public class Cron - { - /// - /// Returns cron expression that fires every minute. - /// - public static string Minutely() - { - return "* * * * *"; - } - - /// - /// Returns cron expression that fires every hour at the first minute. - /// - public static string Hourly() - { - return Hourly(minute: 0); - } - - /// - /// Returns cron expression that fires every hour at the specified minute. - /// - /// The minute in which the schedule will be activated (0-59). - public static string Hourly(int minute) - { - return string.Format("{0} * * * *", minute); - } - - /// - /// Returns cron expression that fires every day at 00:00 UTC. - /// - public static string Daily() - { - return Daily(hour: 0); - } - - /// - /// Returns cron expression that fires every day at the first minute of - /// the specified hour in UTC. - /// - /// The hour in which the schedule will be activated (0-23). - public static string Daily(int hour) - { - return Daily(hour, minute: 0); - } - - /// - /// Returns cron expression that fires every day at the specified hour and minute - /// in UTC. - /// - /// The hour in which the schedule will be activated (0-23). - /// The minute in which the schedule will be activated (0-59). - public static string Daily(int hour, int minute) - { - return string.Format("{0} {1} * * *", minute, hour); - } - - /// - /// Returns cron expression that fires every week at Monday, 00:00 UTC. - /// - public static string Weekly() - { - return Weekly(DayOfWeek.Monday); - } - - /// - /// Returns cron expression that fires every week at 00:00 UTC of the specified - /// day of the week. - /// - /// The day of week in which the schedule will be activated. - public static string Weekly(DayOfWeek dayOfWeek) - { - return Weekly(dayOfWeek, hour: 0); - } - - /// - /// Returns cron expression that fires every week at the first minute - /// of the specified day of week and hour in UTC. - /// - /// The day of week in which the schedule will be activated. - /// The hour in which the schedule will be activated (0-23). - public static string Weekly(DayOfWeek dayOfWeek, int hour) - { - return Weekly(dayOfWeek, hour, minute: 0); - } - - /// - /// Returns cron expression that fires every week at the specified day - /// of week, hour and minute in UTC. - /// - /// The day of week in which the schedule will be activated. - /// The hour in which the schedule will be activated (0-23). - /// The minute in which the schedule will be activated (0-59). - public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute) - { - return string.Format("{0} {1} * * {2}", minute, hour, (int) dayOfWeek); - } - - /// - /// Returns cron expression that fires every month at 00:00 UTC of the first - /// day of month. - /// - public static string Monthly() - { - return Monthly(day: 1); - } - - /// - /// Returns cron expression that fires every month at 00:00 UTC of the specified - /// day of month. - /// - /// The day of month in which the schedule will be activated (1-31). - public static string Monthly(int day) - { - return Monthly(day, hour: 0); - } - - /// - /// Returns cron expression that fires every month at the first minute of the - /// specified day of month and hour in UTC. - /// - /// The day of month in which the schedule will be activated (1-31). - /// The hour in which the schedule will be activated (0-23). - public static string Monthly(int day, int hour) - { - return Monthly(day, hour, minute: 0); - } - - /// - /// Returns cron expression that fires every month at the specified day of month, - /// hour and minute in UTC. - /// - /// The day of month in which the schedule will be activated (1-31). - /// The hour in which the schedule will be activated (0-23). - /// The minute in which the schedule will be activated (0-59). - public static string Monthly(int day, int hour, int minute) - { - return string.Format("{0} {1} {2} * *", minute, hour, day); - } - - /// - /// Returns cron expression that fires every year on Jan, 1st at 00:00 UTC. - /// - public static string Yearly() - { - return Yearly(month: 1); - } - - /// - /// Returns cron expression that fires every year in the first day at 00:00 UTC - /// of the specified month. - /// - /// The month in which the schedule will be activated (1-12). - public static string Yearly(int month) - { - return Yearly(month, day: 1); - } - - /// - /// Returns cron expression that fires every year at 00:00 UTC of the specified - /// month and day of month. - /// - /// The month in which the schedule will be activated (1-12). - /// The day of month in which the schedule will be activated (1-31). - public static string Yearly(int month, int day) - { - return Yearly(month, day, hour: 0); - } - - /// - /// Returns cron expression that fires every year at the first minute of the - /// specified month, day and hour in UTC. - /// - /// The month in which the schedule will be activated (1-12). - /// The day of month in which the schedule will be activated (1-31). - /// The hour in which the schedule will be activated (0-23). - public static string Yearly(int month, int day, int hour) - { - return Yearly(month, day, hour, minute: 0); - } - - /// - /// Returns cron expression that fires every year at the specified month, day, - /// hour and minute in UTC. - /// - /// The month in which the schedule will be activated (1-12). - /// The day of month in which the schedule will be activated (1-31). - /// The hour in which the schedule will be activated (0-23). - /// The minute in which the schedule will be activated (0-59). - public static string Yearly(int month, int day, int hour, int minute) - { - return $"{minute} {hour} {day} {month} *"; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/CronJob.cs b/src/DotNetCore.CAP/Job/CronJob.cs deleted file mode 100644 index f05fd50..0000000 --- a/src/DotNetCore.CAP/Job/CronJob.cs +++ /dev/null @@ -1,37 +0,0 @@ -using System; - -namespace DotNetCore.CAP.Job -{ - /// - /// Represents a cron job to be executed at specified intervals of time. - /// - public class CronJob - { - public CronJob() - { - Id = Guid.NewGuid().ToString(); - } - - public CronJob(string cron) - : this() - { - Cron = cron; - } - - public CronJob(string cron, DateTime lastRun) - : this(cron) - { - LastRun = lastRun; - } - - public string Id { get; set; } - - public string Name { get; set; } - - public string TypeName { get; set; } - - public string Cron { get; set; } - - public DateTime LastRun { get; set; } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/CronJobRegistry.Default.cs b/src/DotNetCore.CAP/Job/CronJobRegistry.Default.cs deleted file mode 100644 index ee6b294..0000000 --- a/src/DotNetCore.CAP/Job/CronJobRegistry.Default.cs +++ /dev/null @@ -1,15 +0,0 @@ -using DotNetCore.CAP.Infrastructure; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.Job -{ - public class DefaultCronJobRegistry : CronJobRegistry - { - public DefaultCronJobRegistry(IOptions options) - { - var options1 = options.Value; - - RegisterJob(nameof(DefaultCronJobRegistry), options1.CronExp, RetryBehavior.DefaultRetry); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/CronJobRegistry.cs b/src/DotNetCore.CAP/Job/CronJobRegistry.cs deleted file mode 100644 index df0d016..0000000 --- a/src/DotNetCore.CAP/Job/CronJobRegistry.cs +++ /dev/null @@ -1,68 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using NCrontab; - -namespace DotNetCore.CAP.Job -{ - public abstract class CronJobRegistry - { - private readonly List _entries; - - protected CronJobRegistry() - { - _entries = new List(); - } - - protected void RegisterJob(string name, string cron, RetryBehavior retryBehavior = null) - where T : IJob - { - RegisterJob(name, typeof(T), cron, retryBehavior); - } - - /// - /// Registers a cron job. - /// - /// The name of the job. - /// The job's type. - /// The cron expression to use. - /// The to use. - protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) - { - if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron)); - if (jobType == null) throw new ArgumentNullException(nameof(jobType)); - if (cron == null) throw new ArgumentNullException(nameof(cron)); - retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry; - - CrontabSchedule.TryParse(cron); - - if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) - { - throw new ArgumentException( - "Cron jobs should extend IJob.", nameof(jobType)); - } - - _entries.Add(new Entry(name, jobType, cron)); - } - - public Entry[] Build() => _entries.ToArray(); - - public class Entry - { - public Entry(string name, Type jobType, string cron) - { - Name = name; - JobType = jobType; - Cron = cron; - } - - public string Name { get; set; } - - public Type JobType { get; set; } - - public string Cron { get; set; } - - public RetryBehavior RetryBehavior { get; set; } - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/IJob.CapJob.cs b/src/DotNetCore.CAP/Job/IJob.CapJob.cs index 138281d..e93ada5 100644 --- a/src/DotNetCore.CAP/Job/IJob.CapJob.cs +++ b/src/DotNetCore.CAP/Job/IJob.CapJob.cs @@ -32,32 +32,32 @@ namespace DotNetCore.CAP.Job public async Task ExecuteAsync() { - var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); - using (var scope = _serviceProvider.CreateScope()) - { - var provider = scope.ServiceProvider; + //var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); + //using (var scope = _serviceProvider.CreateScope()) + //{ + // var provider = scope.ServiceProvider; - var messageStore = provider.GetService(); - var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); - if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group)) - { - try - { - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); - // If there are multiple consumers in the same group, we will take the first - var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0]; - var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext()); - var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - await invoker.InvokeAsync(); - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); + // var messageStore = provider.GetService(); + // var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); + // if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group)) + // { + // try + // { + // await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); + // // If there are multiple consumers in the same group, we will take the first + // var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0]; + // var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext()); + // var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + // await invoker.InvokeAsync(); + // await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); - } - catch (Exception ex) - { - _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); - } - } - } + // } + // catch (Exception ex) + // { + // _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); + // } + // } + //} } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.CronJob.cs b/src/DotNetCore.CAP/Job/IJobProcessor.CronJob.cs deleted file mode 100644 index 2c21bf7..0000000 --- a/src/DotNetCore.CAP/Job/IJobProcessor.CronJob.cs +++ /dev/null @@ -1,170 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace DotNetCore.CAP.Job -{ - public class CronJobProcessor : IJobProcessor - { - private readonly ILogger _logger; - private IServiceProvider _provider; - private readonly DefaultCronJobRegistry _jobRegistry; - - public CronJobProcessor( - DefaultCronJobRegistry jobRegistry, - ILogger logger, - IServiceProvider provider) - { - _jobRegistry = jobRegistry; - _logger = logger; - _provider = provider; - } - - public override string ToString() => nameof(CronJobProcessor); - - public Task ProcessAsync(ProcessingContext context) - { - if (context == null) throw new ArgumentNullException(nameof(context)); - return ProcessCoreAsync(context); - } - - private async Task ProcessCoreAsync(ProcessingContext context) - { - //var storage = context.Storage; - //var jobs = await GetJobsAsync(storage); - - var jobs = GetJobs(); - if (!jobs.Any()) - { - _logger.CronJobsNotFound(); - - // This will cancel this processor. - throw new OperationCanceledException(); - } - _logger.CronJobsScheduling(jobs); - - context.ThrowIfStopping(); - - var computedJobs = Compute(jobs, context.CronJobRegistry.Build()); - if (context.IsStopping) - { - return; - } - - await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context))); - } - - private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) - { - //var storage = context.Storage; - var retryBehavior = computedJob.RetryBehavior; - - while (!context.IsStopping) - { - var now = DateTime.UtcNow; - - var due = ComputeDue(computedJob, now); - var timeSpan = due - now; - - if (timeSpan.TotalSeconds > 0) - { - await context.WaitAsync(timeSpan); - } - - context.ThrowIfStopping(); - - using (var scopedContext = context.CreateScope()) - { - var provider = scopedContext.Provider; - - var job = provider.GetService(); - var success = true; - - try - { - var sw = Stopwatch.StartNew(); - await job.ExecuteAsync(); - sw.Stop(); - computedJob.Retries = 0; - _logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds); - } - catch (Exception ex) - { - success = false; - if (computedJob.Retries == 0) - { - computedJob.FirstTry = DateTime.UtcNow; - } - computedJob.Retries++; - _logger.CronJobFailed(computedJob.Job.Name, ex); - } - - if (success) - { - computedJob.Update(DateTime.UtcNow); - } - } - } - } - - private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) - { - computedJob.UpdateNext(now); - - var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry; - var retries = computedJob.Retries; - - if (retries == 0) - { - return computedJob.Next; - } - - var realNext = computedJob.Schedule.GetNextOccurrence(now); - - if (!retryBehavior.Retry) - { - // No retry. If job failed before, we don't care, just schedule it next as usual. - return realNext; - } - - if (retries >= retryBehavior.RetryCount) - { - // Max retries. Just schedule it for the next occurance. - return realNext; - } - - // Delay a bit. - return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries)); - } - - private CronJob[] GetJobs() - { - var cronJobs = new List(); - var entries = _jobRegistry.Build() ?? new CronJobRegistry.Entry[0]; - foreach (var entry in entries) - { - cronJobs.Add(new CronJob - { - Name = entry.Name, - TypeName = entry.JobType.AssemblyQualifiedName, - Cron = entry.Cron, - LastRun = DateTime.MinValue - }); - } - return cronJobs.ToArray(); - } - - private ComputedCronJob[] Compute(IEnumerable jobs, CronJobRegistry.Entry[] entries) - => jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray(); - - private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) - { - var entry = entries.First(e => e.Name == job.Name); - return new ComputedCronJob(job, entry); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs index d297d31..bc89d18 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs +++ b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs @@ -1,31 +1,31 @@ using System; using System.Threading; using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Job.States; using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Job { public class JobQueuer : IJobProcessor { private ILogger _logger; - private JobsOptions _options; + private CapOptions _options; private IStateChanger _stateChanger; private IServiceProvider _provider; - - internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); private TimeSpan _pollingDelay; public JobQueuer( ILogger logger, - JobsOptions options, + IOptions options, IStateChanger stateChanger, IServiceProvider provider) { _logger = logger; - _options = options; + _options = options.Value; _stateChanger = stateChanger; _provider = provider; @@ -37,7 +37,7 @@ namespace DotNetCore.CAP.Job using (var scope = _provider.CreateScope()) { CapSentMessage sentMessage; - CapReceivedMessage receivedMessage; + // CapReceivedMessage receivedMessage; var provider = scope.ServiceProvider; var connection = provider.GetRequiredService(); @@ -57,9 +57,10 @@ namespace DotNetCore.CAP.Job } context.ThrowIfStopping(); - - DelayedJobProcessor.PulseEvent.Set(); - await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); + + WaitHandleEx.SentPulseEvent.Set(); + await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent, + context.CancellationToken.WaitHandle, _pollingDelay); } } } diff --git a/src/DotNetCore.CAP/Job/IMessageJobProcessor.cs b/src/DotNetCore.CAP/Job/IMessageJobProcessor.cs new file mode 100644 index 0000000..a280c0a --- /dev/null +++ b/src/DotNetCore.CAP/Job/IMessageJobProcessor.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace DotNetCore.CAP.Job +{ + public interface IMessageJobProcessor : IJobProcessor + { + bool Waiting { get; } + } +} diff --git a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs b/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs index 0bb912f..0044d31 100644 --- a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs +++ b/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs @@ -17,7 +17,6 @@ namespace DotNetCore.CAP.Job private readonly IServiceProvider _provider; private readonly CancellationTokenSource _cts; private readonly CapOptions _options; - private readonly DefaultCronJobRegistry _defaultJobRegistry; private IJobProcessor[] _processors; private ProcessingContext _context; @@ -28,13 +27,11 @@ namespace DotNetCore.CAP.Job ILogger logger, ILoggerFactory loggerFactory, IServiceProvider provider, - DefaultCronJobRegistry defaultJobRegistry, IOptions options) { _logger = logger; _loggerFactory = loggerFactory; _provider = provider; - _defaultJobRegistry = defaultJobRegistry; _options = options.Value; _cts = new CancellationTokenSource(); } @@ -46,10 +43,7 @@ namespace DotNetCore.CAP.Job _processors = GetProcessors(processorCount); _logger.ServerStarting(processorCount, processorCount); - _context = new ProcessingContext( - _provider, - _defaultJobRegistry, - _cts.Token); + _context = new ProcessingContext(_provider, _cts.Token); var processorTasks = _processors .Select(InfiniteRetry) @@ -57,6 +51,31 @@ namespace DotNetCore.CAP.Job _compositeTask = Task.WhenAll(processorTasks); } + public void Pulse() + { + if (!AllProcessorsWaiting()) + { + // Some processor is still executing jobs so no need to pulse. + return; + } + + _logger.LogTrace("Pulsing the JobQueuer."); + + WaitHandleEx.QueuePulseEvent.Set(); + } + + private bool AllProcessorsWaiting() + { + foreach (var processor in _processors) + { + if (!processor.Waiting) + { + return false; + } + } + return true; + } + public void Dispose() { if (_disposed) @@ -69,7 +88,7 @@ namespace DotNetCore.CAP.Job _cts.Cancel(); try { - _compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds); + _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); } catch (AggregateException ex) { diff --git a/src/DotNetCore.CAP/Job/ProcessingContext.cs b/src/DotNetCore.CAP/Job/ProcessingContext.cs index 7498298..764ae80 100644 --- a/src/DotNetCore.CAP/Job/ProcessingContext.cs +++ b/src/DotNetCore.CAP/Job/ProcessingContext.cs @@ -16,24 +16,19 @@ namespace DotNetCore.CAP.Job private ProcessingContext(ProcessingContext other) { Provider = other.Provider; - CronJobRegistry = other.CronJobRegistry; CancellationToken = other.CancellationToken; } public ProcessingContext( IServiceProvider provider, - CronJobRegistry cronJobRegistry, CancellationToken cancellationToken) { Provider = provider; - CronJobRegistry = cronJobRegistry; CancellationToken = cancellationToken; } public IServiceProvider Provider { get; private set; } - public CronJobRegistry CronJobRegistry { get; private set; } - public CancellationToken CancellationToken { get; } public bool IsStopping => CancellationToken.IsCancellationRequested; diff --git a/test/DotNetCore.CAP.Test/NoopMessageStore.cs b/test/DotNetCore.CAP.Test/NoopMessageStore.cs index dfa7be4..6e70ffd 100644 --- a/test/DotNetCore.CAP.Test/NoopMessageStore.cs +++ b/test/DotNetCore.CAP.Test/NoopMessageStore.cs @@ -1,7 +1,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; namespace DotNetCore.CAP.Test @@ -14,45 +13,9 @@ namespace DotNetCore.CAP.Test throw new NotImplementedException(); } - public Task ChangeSentMessageStateAsync(CapSentMessage message, string statusName, - bool autoSaveChanges = true) - { - throw new NotImplementedException(); - } - - public Task GetNextReceivedMessageToBeExcuted() - { - throw new NotImplementedException(); - } - - public Task GetNextSentMessageToBeEnqueuedAsync() - { - throw new NotImplementedException(); - } - - public Task RemoveSentMessageAsync(CapSentMessage message) - { - throw new NotImplementedException(); - } - - public Task StoreReceivedMessageAsync(CapReceivedMessage message) - { - throw new NotImplementedException(); - } - public Task StoreSentMessageAsync(CapSentMessage message) { throw new NotImplementedException(); } - - public Task UpdateReceivedMessageAsync(CapReceivedMessage message) - { - throw new NotImplementedException(); - } - - public Task UpdateSentMessageAsync(CapSentMessage message) - { - throw new NotImplementedException(); - } } } \ No newline at end of file diff --git a/test/Shared/MessageManagerTestBase.cs b/test/Shared/MessageManagerTestBase.cs index f375bf2..e0ca62b 100644 --- a/test/Shared/MessageManagerTestBase.cs +++ b/test/Shared/MessageManagerTestBase.cs @@ -66,30 +66,30 @@ namespace DotNetCore.CAP.Test Assert.NotNull(operateResult); Assert.True(operateResult.Succeeded); - operateResult = await manager.RemoveSentMessageAsync(message); - Assert.NotNull(operateResult); - Assert.True(operateResult.Succeeded); + // operateResult = await manager.RemoveSentMessageAsync(message); + // Assert.NotNull(operateResult); + // Assert.True(operateResult.Succeeded); } - [Fact] - public async Task CanUpdateReceivedMessage() - { - if (ShouldSkipDbTests()) - { - return; - } - - var manager = CreateManager(); - var message = CreateTestReceivedMessage(); - var operateResult = await manager.StoreReceivedMessageAsync(message); - Assert.NotNull(operateResult); - Assert.True(operateResult.Succeeded); - - message.StatusName = StatusName.Processing; - operateResult = await manager.UpdateReceivedMessageAsync(message); - Assert.NotNull(operateResult); - Assert.True(operateResult.Succeeded); - } + //[Fact] + //public async Task CanUpdateReceivedMessage() + //{ + // if (ShouldSkipDbTests()) + // { + // return; + // } + + // var manager = CreateManager(); + // var message = CreateTestReceivedMessage(); + // // var operateResult = await manager.StoreReceivedMessageAsync(message); + // // Assert.NotNull(operateResult); + // // Assert.True(operateResult.Succeeded); + + // // message.StatusName = StatusName.Processing; + // // operateResult = await manager.UpdateReceivedMessageAsync(message); + // // Assert.NotNull(operateResult); + // // Assert.True(operateResult.Succeeded); + //} [Fact] public async Task CanGetNextSendMessage() @@ -105,9 +105,9 @@ namespace DotNetCore.CAP.Test Assert.NotNull(operateResult); Assert.True(operateResult.Succeeded); - var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync(); + // var storeMessage = await manager.GetNextSentMessageToBeEnqueuedAsync(); - Assert.Equal(message, storeMessage); + // Assert.Equal(message, storeMessage); } } } \ No newline at end of file