Browse Source

add kafka sqlserver sample.

master
yangxiaodong 7 years ago
parent
commit
499bc1020d
5 changed files with 222 additions and 0 deletions
  1. +14
    -0
      samples/Sample.Kafka.SqlServer/AppDbContext.cs
  2. +99
    -0
      samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs
  3. +37
    -0
      samples/Sample.Kafka.SqlServer/Program.cs
  4. +27
    -0
      samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
  5. +45
    -0
      samples/Sample.Kafka.SqlServer/Startup.cs

+ 14
- 0
samples/Sample.Kafka.SqlServer/AppDbContext.cs View File

@@ -0,0 +1,14 @@
using Microsoft.EntityFrameworkCore;

namespace Sample.Kafka.SqlServer
{
public class AppDbContext : DbContext
{

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Sample.Kafka.SqlServer;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
// optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}

+ 99
- 0
samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs View File

@@ -0,0 +1,99 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Kafka.SqlServer.Controllers
{
public class Person
{
public int Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }

public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
}
}


[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;

public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}

[Route("~/publish")]
public IActionResult PublishMessage()
{

_capBus.Publish("sample.rabbitmq.sqlserver.order.check", DateTime.Now);

//var person = new Person
//{
// Name = "杨晓东",
// Age = 11,
// Id = 23
//};
//_capBus.Publish("sample.rabbitmq.mysql33333", person);

return Ok();
}

[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");

trans.Commit();
}
return Ok();
}

[CapSubscribe("sample.rabbitmq.mysql33333",Group ="Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;

_dbContext.Dispose();

Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
}

//[CapSubscribe("sample.rabbitmq.mysql22222")]
//public void KafkaTest22(DateTime time)
//{
// Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
// Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
//}

[CapSubscribe("sample.rabbitmq.mysql22222")]
public async Task<DateTime> KafkaTest33(DateTime time)
{
Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
return await Task.FromResult(time);
}

[NonAction]
[CapSubscribe("sample.kafka.sqlserver3")]
[CapSubscribe("sample.kafka.sqlserver4")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}

+ 37
- 0
samples/Sample.Kafka.SqlServer/Program.cs View File

@@ -0,0 +1,37 @@
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;

namespace Sample.Kafka.SqlServer
{
public class Program
{

//var config = new ConfigurationBuilder()
// .AddCommandLine(args)
// .AddEnvironmentVariables("ASPNETCORE_")
// .Build();

//var host = new WebHostBuilder()
// .UseConfiguration(config)
// .UseKestrel()
// .UseContentRoot(Directory.GetCurrentDirectory())
// .UseIISIntegration()
// .UseStartup<Startup>()
// .Build();

//host.Run();
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}

public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();

}
}

+ 27
- 0
samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj View File

@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 45
- 0
samples/Sample.Kafka.SqlServer/Startup.cs View File

@@ -0,0 +1,45 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.Kafka.SqlServer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();

services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("192.168.2.227:9091,192.168.2.227:9092,192.168.2.222:9092");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerProt = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5820;
d.NodeName = "CAP 2号节点";
});
});

services.AddMvc();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();

app.UseMvc();

app.UseCap();

app.UseCapDashboard();
}
}
}

Loading…
Cancel
Save