diff --git a/samples/Sample.Kafka/AppDbContext.cs b/samples/Sample.Kafka/AppDbContext.cs new file mode 100644 index 0000000..e929396 --- /dev/null +++ b/samples/Sample.Kafka/AppDbContext.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Cap.Consistency.Infrastructure; +using JetBrains.Annotations; +using Microsoft.EntityFrameworkCore; + +namespace Sample.Kafka +{ + public class AppDbContext : DbContext + { + public DbSet Messages { get; set; } + } +} diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs index f15b00f..8fc02af 100644 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka/Controllers/ValuesController.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using Cap.Consistency.Consumer; using Cap.Consistency.Kafka; +using Cap.Consistency.Producer; using Microsoft.AspNetCore.Mvc; namespace Sample.Kafka.Controllers @@ -11,10 +12,27 @@ namespace Sample.Kafka.Controllers [Route("api/[controller]")] public class ValuesController : Controller, IConsumerService { + private readonly IProducerClient _producer; - [KafkaTopic("zzwl.topic.finace.callBack", IsOneWay = true)] + public ValuesController(IProducerClient producer) { + _producer = producer; + } + + [Route("/")] + public IActionResult Index() { + return Ok(); + } + + [KafkaTopic("zzwl.topic.finace.callBack", IsOneWay = true, GroupOrExchange = "test")] + [NonAction] public void KafkaTest() { Console.WriteLine("kafka test invoked"); } + + [Route("~/send")] + public async Task SendTopic() { + await _producer.SendAsync("zzwl.topic.finace.callBack", "{\"msgBody\":\"{\\\"dealno\\\":null,\\\"businesstype\\\":\\\"1\\\",\\\"serialno\\\":\\\"435ldfhj345\\\",\\\"bankno\\\":\\\"650001\\\",\\\"amt\\\":20.0,\\\"virtualstatus\\\":1,\\\"paystatus\\\":1}\",\"callbackTopicName\":\"zzwl.topic.finace.callBack\",\"createId\":null,\"retryLimit\":0}"); + return Ok(); + } } } diff --git a/samples/Sample.Kafka/Entity/AppKafkaLog.cs b/samples/Sample.Kafka/Entity/AppKafkaLog.cs deleted file mode 100644 index bf5b899..0000000 --- a/samples/Sample.Kafka/Entity/AppKafkaLog.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; - -namespace Sample.Kafka.Entity -{ - public class AppKafkaLog - { - } -} diff --git a/samples/Sample.Kafka/Sample.Kafka.csproj b/samples/Sample.Kafka/Sample.Kafka.csproj index d922f17..f663107 100644 --- a/samples/Sample.Kafka/Sample.Kafka.csproj +++ b/samples/Sample.Kafka/Sample.Kafka.csproj @@ -11,13 +11,17 @@ + + + + diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs index 9a142b5..d06ddc0 100644 --- a/samples/Sample.Kafka/Startup.cs +++ b/samples/Sample.Kafka/Startup.cs @@ -5,10 +5,10 @@ using System.Threading.Tasks; using Cap.Consistency.Infrastructure; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Sample.Kafka.Entity; namespace Sample.Kafka { @@ -27,12 +27,18 @@ namespace Sample.Kafka // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { + + services.AddDbContext(); + + services.AddConsistency() + .AddEntityFrameworkStores() + .AddKafka(); + // Add framework services. services.AddMvc(); - - services.AddConsistency(); } + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { loggerFactory.AddConsole(Configuration.GetSection("Logging"));