Ver código fonte

update samples.

undefined
Savorboard 7 anos atrás
pai
commit
9a2327a6a9
6 arquivos alterados com 114 adições e 45 exclusões
  1. +1
    -1
      samples/Sample.Kafka.SqlServer/AppDbContext.cs
  2. +50
    -0
      samples/Sample.Kafka.SqlServer/CmsContentSerializer.cs
  3. +50
    -12
      samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs
  4. +1
    -15
      samples/Sample.Kafka.SqlServer/Program.cs
  5. +1
    -1
      samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
  6. +11
    -16
      samples/Sample.Kafka.SqlServer/Startup.cs

+ 1
- 1
samples/Sample.Kafka.SqlServer/AppDbContext.cs Ver arquivo

@@ -8,7 +8,7 @@ namespace Sample.Kafka.SqlServer
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Sample.Kafka.SqlServer;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=TestCap;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
}
}

+ 50
- 0
samples/Sample.Kafka.SqlServer/CmsContentSerializer.cs Ver arquivo

@@ -0,0 +1,50 @@
using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Newtonsoft.Json;

namespace Sample.RabbitMQ.SqlServer
{
public class MessageContent : CapMessage
{
[JsonProperty("id")]
public override string Id { get; set; }

[JsonProperty("createdTime")]
public override DateTime Timestamp { get; set; }

[JsonProperty("msgBody")]
public override string Content { get; set; }

[JsonProperty("callbackTopicName")]
public override string CallbackName { get; set; }
}

public class MyMessagePacker : IMessagePacker
{
private readonly IContentSerializer _serializer;

public MyMessagePacker(IContentSerializer serializer)
{
_serializer = serializer;
}

public string Pack(CapMessage obj)
{
var content = new MessageContent
{
Id = obj.Id,
Content = obj.Content,
CallbackName = obj.CallbackName,
Timestamp = obj.Timestamp
};
return _serializer.Serialize(content);
}

public CapMessage UnPack(string packingMessage)
{
return _serializer.DeSerialize<MessageContent>(packingMessage);
}
}
}


+ 50
- 12
samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs Ver arquivo

@@ -4,18 +4,36 @@ using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;

namespace Sample.Kafka.SqlServer.Controllers
{
public class Person
{
public int Id { get; set; }
[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("uname")]
public string Name { get; set; }
public int Age { get; set; }

public HAHA Haha { get; set; }

public override string ToString()
{
return "Name:" + Name + ";Id:" + Id + "Haha:" + Haha?.ToString();
}
}

public class HAHA
{
[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("uname")]
public string Name { get; set; }
public override string ToString()
{
return "Name:" + Name + ";Age:" + Age;
return "Name:" + Name + ";Id:" + Id;
}
}

@@ -32,23 +50,43 @@ namespace Sample.Kafka.SqlServer.Controllers
_dbContext = dbContext;
}


[Route("~/publish")]
public IActionResult PublishMessage()
{
var p = new Person
{
Id = Guid.NewGuid().ToString(),
Name = "杨晓东",
Haha = new HAHA
{
Id = Guid.NewGuid().ToString(),
Name = "1-1杨晓东",
}
};

_capBus.Publish("sample.rabbitmq.sqlserver.order.check", DateTime.Now);
_capBus.Publish("wl.yxd.test", p, "wl.yxd.test.callback");

//var person = new Person
//{
// Name = "杨晓东",
// Age = 11,
// Id = 23
//};
//_capBus.Publish("sample.rabbitmq.mysql33333", person);

//_capBus.Publish("wl.cj.test", p);
return Ok();
}

[CapSubscribe("wl.yxd.test.callback")]
public void KafkaTestCallback(Person p)
{
Console.WriteLine("回调内容:" + p);
}


[CapSubscribe("wl.cj.test")]
public string KafkaTestReceived(Person person)
{
Console.WriteLine(person);
Debug.WriteLine(person);
return "this is callback message";
}

[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
@@ -61,7 +99,7 @@ namespace Sample.Kafka.SqlServer.Controllers
return Ok();
}

[CapSubscribe("sample.rabbitmq.mysql33333",Group ="Test.Group")]
[CapSubscribe("sample.rabbitmq.mysql33333", Group = "Test.Group")]
public void KafkaTest22(Person person)
{
var aa = _dbContext.Database;


+ 1
- 15
samples/Sample.Kafka.SqlServer/Program.cs Ver arquivo

@@ -8,21 +8,7 @@ namespace Sample.Kafka.SqlServer
{
public class Program
{

//var config = new ConfigurationBuilder()
// .AddCommandLine(args)
// .AddEnvironmentVariables("ASPNETCORE_")
// .Build();

//var host = new WebHostBuilder()
// .UseConfiguration(config)
// .UseKestrel()
// .UseContentRoot(Directory.GetCurrentDirectory())
// .UseIISIntegration()
// .UseStartup<Startup>()
// .Build();

//host.Run();
public static void Main(string[] args)
{
BuildWebHost(args).Run();


+ 1
- 1
samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj Ver arquivo

@@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Sample.RabbitMQ.SqlServer</AssemblyName>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>


+ 11
- 16
samples/Sample.Kafka.SqlServer/Startup.cs Ver arquivo

@@ -3,6 +3,7 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer;

namespace Sample.Kafka.SqlServer
{
@@ -15,28 +16,22 @@ namespace Sample.Kafka.SqlServer
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("192.168.1.26:9092");
x.UseKafka("192.168.2.215:9092");
x.UseDashboard();
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5820;
d.NodeName = "CAP 2号节点";
});
});
services.AddSession();
//x.UseDiscovery(d =>
//{
// d.DiscoveryServerHostName = "localhost";
// d.DiscoveryServerPort = 8500;
// d.CurrentNodeHostName = "localhost";
// d.CurrentNodePort = 5820;
// d.NodeName = "CAP 2号节点";
//});
}).AddMessagePacker<MyMessagePacker>();
services.AddMvc();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();

app.UseSession();

app.UseMvc();

app.UseCap();


Carregando…
Cancelar
Salvar