@@ -1,39 +1,85 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.EntityFrameworkCore
{
public class CapPublisher : ICapPublisher
{
private readonly SqlServerOptions _options;
private readonly IServiceProvider _provider;
private readonly DbContext _dbContext;
public CapPublisher(SqlServerOptions options, IServiceProvider provider)
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider, SqlServerOptions options)
{
ServiceProvider = provider;
_options = options;
_provider = provider;
_dbContext = (DbContext)_provider.GetService(_options.DbContextType);
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}
public Task PublishAsync(string topic, string content)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
return Publish(topic, content);
}
public Task PublishAsync<T>(string topic, T contentObj)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
var content = Helper.ToJson(contentObj);
return Publish(topic, content);
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
return PublishWithTrans(topic, content, dbConnection, dbTransaction);
}
public Task PublishAsync(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction));
return PublishWithTrans(topic, content, dbConnection, dbTransaction);
}
public async Task PublishAsync(string topic, string content)
private async Task Publish (string topic, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTrans(topic, content, connection, dbTransaction);
}
private async Task PublishWithTrans(string topic, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapSentMessage
{
KeyName = topic,
@@ -41,15 +87,10 @@ namespace DotNetCore.CAP.EntityFrameworkCore
StatusName = StatusName.Scheduled
};
var sql = "INSERT INTO [cap].[CapSentMessages ] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await c onnection.ExecuteAsync(sql, message, transaction: dbTransaction);
var sql = $"INSERT INTO {_options.Schema}.[{nameof(CapDbContext.CapSentMessages)} ] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)";
await dbC onnection.ExecuteAsync(sql, message, transaction: dbTransaction);
PublishQueuer.PulseEvent.Set();
}
public Task PublishAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
}
}