@@ -1,5 +1,5 @@ | |||||
using System; | using System; | ||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP; | using DotNetCore.CAP; | ||||
using Microsoft.AspNetCore.Mvc; | using Microsoft.AspNetCore.Mvc; | ||||
using MySql.Data.MySqlClient; | using MySql.Data.MySqlClient; | ||||
@@ -7,37 +7,51 @@ using MySql.Data.MySqlClient; | |||||
namespace Sample.Kafka.MySql.Controllers | namespace Sample.Kafka.MySql.Controllers | ||||
{ | { | ||||
[Route("api/[controller]")] | [Route("api/[controller]")] | ||||
public class ValuesController : Controller, ICapSubscribe | |||||
public class ValuesController : Controller | |||||
{ | { | ||||
private readonly ICapPublisher _capBus; | private readonly ICapPublisher _capBus; | ||||
public ValuesController(ICapPublisher producer) | |||||
public ValuesController(ICapPublisher capPublisher) | |||||
{ | { | ||||
_capBus = producer; | |||||
_capBus = capPublisher; | |||||
} | } | ||||
[Route("~/publish")] | |||||
public async Task<IActionResult> PublishMessage() | |||||
[Route("~/without/transaction")] | |||||
public IActionResult WithoutTransaction() | |||||
{ | { | ||||
using (var connection = new MySqlConnection("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;")) | |||||
{ | |||||
connection.Open(); | |||||
var transaction = connection.BeginTransaction(); | |||||
//your business code here | |||||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||||
await _capBus.PublishAsync("xxx.xxx.test2", 123456); | |||||
return Ok(); | |||||
} | |||||
transaction.Commit(); | |||||
[Route("~/adonet/transaction")] | |||||
public IActionResult AdonetWithTransaction() | |||||
{ | |||||
//NOTE: Add `IgnoreCommandTransaction=true;` to your connection string, see https://github.com/mysql-net/MySqlConnector/issues/474 | |||||
using (var connection = new MySqlConnection(Startup.ConnectionString)) | |||||
{ | |||||
using (var transaction = connection.BeginAndJoinToTransaction(_capBus, autoCommit: false)) | |||||
{ | |||||
//your business code | |||||
connection.Execute("insert into test(name) values('test')", transaction); | |||||
for (int i = 0; i < 5; i++) | |||||
{ | |||||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||||
} | |||||
transaction.Commit(); | |||||
} | |||||
} | } | ||||
return Ok("publish successful!"); | |||||
return Ok(); | |||||
} | } | ||||
[CapSubscribe("#.test2")] | |||||
public void Test2(int value) | |||||
[NonAction] | |||||
[CapSubscribe("sample.rabbitmq.mysql")] | |||||
public void Subscriber(DateTime time) | |||||
{ | { | ||||
Console.WriteLine("Subscriber output message: " + value); | |||||
Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}"); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,16 +1,17 @@ | |||||
using System; | |||||
using Microsoft.AspNetCore.Builder; | |||||
using Microsoft.AspNetCore.Builder; | |||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
namespace Sample.Kafka.MySql | namespace Sample.Kafka.MySql | ||||
{ | { | ||||
public class Startup | public class Startup | ||||
{ | { | ||||
public const string ConnectionString = "Server=localhost;Database=testcap;UserId=root;Password=123123;"; | |||||
public void ConfigureServices(IServiceCollection services) | public void ConfigureServices(IServiceCollection services) | ||||
{ | { | ||||
services.AddCap(x => | services.AddCap(x => | ||||
{ | { | ||||
x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;"); | |||||
x.UseMySql(ConnectionString); | |||||
x.UseKafka("localhost:9092"); | x.UseKafka("localhost:9092"); | ||||
x.UseDashboard(); | x.UseDashboard(); | ||||
}); | }); | ||||
@@ -11,60 +11,66 @@ namespace Sample.RabbitMQ.MongoDB.Controllers | |||||
public class ValuesController : ControllerBase | public class ValuesController : ControllerBase | ||||
{ | { | ||||
private readonly IMongoClient _client; | private readonly IMongoClient _client; | ||||
private readonly ICapPublisher _capPublisher; | |||||
private readonly ICapPublisher _capBus; | |||||
public ValuesController(IMongoClient client, ICapPublisher capPublisher) | |||||
public ValuesController(IMongoClient client, ICapPublisher capBus) | |||||
{ | { | ||||
_client = client; | _client = client; | ||||
_capPublisher = capPublisher; | |||||
_capBus = capBus; | |||||
} | } | ||||
[Route("~/publish")] | |||||
public IActionResult PublishWithTrans() | |||||
[Route("~/without/transaction")] | |||||
public IActionResult WithoutTransaction() | |||||
{ | { | ||||
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
return Ok(); | |||||
} | |||||
[Route("~/transaction/not/autocommit")] | |||||
public IActionResult PublishNotAutoCommit() | |||||
{ | |||||
//NOTE: before your test, your need to create database and collection at first | |||||
//注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建 | |||||
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | //var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | ||||
//mycollection.InsertOne(new BsonDocument { { "test", "test" } }); | //mycollection.InsertOne(new BsonDocument { { "test", "test" } }); | ||||
using (var session = _client.StartSession()) | |||||
using (var trans = _capPublisher.Transaction.Begin(session)) | |||||
using (var session = _client.StartAndJoinToTransaction(_capBus, autoCommit: false)) | |||||
{ | { | ||||
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | ||||
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); | collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); | ||||
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
trans.Commit(); | |||||
session.CommitTransaction(); | |||||
} | } | ||||
return Ok(); | return Ok(); | ||||
} | } | ||||
[Route("~/publish/autocommit")] | |||||
public IActionResult PublishNotAutoCommit() | |||||
[Route("~/transaction/autocommit")] | |||||
public IActionResult PublishWithoutTrans() | |||||
{ | { | ||||
using (var session = _client.StartSession()) | |||||
using (_capPublisher.Transaction.Begin(session, true)) | |||||
//NOTE: before your test, your need to create database and collection at first | |||||
//注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建 | |||||
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | |||||
//mycollection.InsertOne(new BsonDocument { { "test", "test" } }); | |||||
using (var session = _client.StartAndJoinToTransaction(_capBus, autoCommit: true)) | |||||
{ | { | ||||
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | ||||
collection.InsertOne(session, new BsonDocument { { "hello2", "world2" } }); | |||||
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); | |||||
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
} | } | ||||
return Ok(); | return Ok(); | ||||
} | } | ||||
[Route("~/publish/without/trans")] | |||||
public IActionResult PublishWithoutTrans() | |||||
{ | |||||
_capPublisher.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||||
return Ok(); | |||||
} | |||||
[NonAction] | [NonAction] | ||||
[CapSubscribe("sample.rabbitmq.mongodb")] | [CapSubscribe("sample.rabbitmq.mongodb")] | ||||
public void ReceiveMessage(DateTime time) | public void ReceiveMessage(DateTime time) | ||||
{ | { | ||||
Console.WriteLine("[sample.rabbitmq.mongodb] message received: " + DateTime.Now + ",sent time: " + time); | |||||
Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}"); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,5 +1,4 @@ | |||||
using System; | using System; | ||||
using System.Threading.Tasks; | |||||
using Dapper; | using Dapper; | ||||
using DotNetCore.CAP; | using DotNetCore.CAP; | ||||
using Microsoft.AspNetCore.Mvc; | using Microsoft.AspNetCore.Mvc; | ||||
@@ -49,7 +48,7 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||||
} | } | ||||
[Route("~/ef/transaction")] | [Route("~/ef/transaction")] | ||||
public async Task<IActionResult> EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) | |||||
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) | |||||
{ | { | ||||
using (var trans = dbContext.Database.BeginAndJoinToTransaction(_capBus, autoCommit: false)) | using (var trans = dbContext.Database.BeginAndJoinToTransaction(_capBus, autoCommit: false)) | ||||
{ | { | ||||
@@ -57,7 +56,7 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||||
for (int i = 0; i < 5; i++) | for (int i = 0; i < 5; i++) | ||||
{ | { | ||||
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); | |||||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||||
} | } | ||||
dbContext.SaveChanges(); | dbContext.SaveChanges(); | ||||