@@ -0,0 +1,13 @@ | |||||
using System; | |||||
// ReSharper disable once CheckNamespace | |||||
namespace DotNetCore.CAP | |||||
{ | |||||
public class EFOptions | |||||
{ | |||||
/// <summary> | |||||
/// EF dbcontext type. | |||||
/// </summary> | |||||
public Type DbContextType { get; internal set; } | |||||
} | |||||
} |
@@ -0,0 +1,45 @@ | |||||
using System; | |||||
using DotNetCore.CAP.Processor; | |||||
using DotNetCore.CAP.MySql; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
// ReSharper disable once CheckNamespace | |||||
namespace DotNetCore.CAP | |||||
{ | |||||
public class MySqlCapOptionsExtension : ICapOptionsExtension | |||||
{ | |||||
private readonly Action<MySqlOptions> _configure; | |||||
public MySqlCapOptionsExtension(Action<MySqlOptions> configure) | |||||
{ | |||||
_configure = configure; | |||||
} | |||||
public void AddServices(IServiceCollection services) | |||||
{ | |||||
services.AddSingleton<IStorage, MySqlStorage>(); | |||||
services.AddScoped<IStorageConnection, MySqlStorageConnection>(); | |||||
services.AddScoped<ICapPublisher, CapPublisher>(); | |||||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | |||||
var sqlServerOptions = new MySqlOptions(); | |||||
_configure(sqlServerOptions); | |||||
var provider = TempBuildService(services); | |||||
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType); | |||||
if (dbContextObj != null) | |||||
{ | |||||
var dbContext = (DbContext)dbContextObj; | |||||
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | |||||
} | |||||
services.Configure(_configure); | |||||
services.AddSingleton(sqlServerOptions); | |||||
} | |||||
private IServiceProvider TempBuildService(IServiceCollection services) | |||||
{ | |||||
return services.BuildServiceProvider(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
// ReSharper disable once CheckNamespace | |||||
namespace DotNetCore.CAP | |||||
{ | |||||
public class MySqlOptions : EFOptions | |||||
{ | |||||
/// <summary> | |||||
/// Gets or sets the database's connection string that will be used to store database entities. | |||||
/// </summary> | |||||
public string ConnectionString { get; set; } | |||||
public string TableNamePrefix { get; set; } = "cap"; | |||||
} | |||||
} |
@@ -0,0 +1,49 @@ | |||||
using System; | |||||
using DotNetCore.CAP; | |||||
using Microsoft.EntityFrameworkCore; | |||||
// ReSharper disable once CheckNamespace | |||||
namespace Microsoft.Extensions.DependencyInjection | |||||
{ | |||||
public static class CapOptionsExtensions | |||||
{ | |||||
public static CapOptions UseMySql(this CapOptions options, string connectionString) | |||||
{ | |||||
return options.UseMySql(opt => | |||||
{ | |||||
opt.ConnectionString = connectionString; | |||||
}); | |||||
} | |||||
public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure) | |||||
{ | |||||
if (configure == null) throw new ArgumentNullException(nameof(configure)); | |||||
options.RegisterExtension(new MySqlCapOptionsExtension(configure)); | |||||
return options; | |||||
} | |||||
public static CapOptions UseEntityFramework<TContext>(this CapOptions options) | |||||
where TContext : DbContext | |||||
{ | |||||
return options.UseEntityFramework<TContext>(opt => | |||||
{ | |||||
opt.DbContextType = typeof(TContext); | |||||
}); | |||||
} | |||||
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure) | |||||
where TContext : DbContext | |||||
{ | |||||
if (configure == null) throw new ArgumentNullException(nameof(configure)); | |||||
var efOptions = new EFOptions { DbContextType = typeof(TContext) }; | |||||
configure(efOptions); | |||||
options.RegisterExtension(new MySqlCapOptionsExtension(configure)); | |||||
return options; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,214 @@ | |||||
using System; | |||||
using System.Data; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using DotNetCore.CAP.Processor; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class CapPublisher : ICapPublisher | |||||
{ | |||||
private readonly ILogger _logger; | |||||
private readonly MySqlOptions _options; | |||||
private readonly DbContext _dbContext; | |||||
protected bool IsCapOpenedTrans { get; set; } | |||||
protected bool IsUsingEF { get; } | |||||
protected IServiceProvider ServiceProvider { get; } | |||||
public CapPublisher(IServiceProvider provider, | |||||
ILogger<CapPublisher> logger, | |||||
MySqlOptions options) | |||||
{ | |||||
ServiceProvider = provider; | |||||
_logger = logger; | |||||
_options = options; | |||||
if (_options.DbContextType != null) | |||||
{ | |||||
IsUsingEF = true; | |||||
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); | |||||
} | |||||
} | |||||
public void Publish(string name, string content) | |||||
{ | |||||
CheckIsUsingEF(name); | |||||
PublishCore(name, content); | |||||
} | |||||
public Task PublishAsync(string name, string content) | |||||
{ | |||||
CheckIsUsingEF(name); | |||||
return PublishCoreAsync(name, content); | |||||
} | |||||
public void Publish<T>(string name, T contentObj) | |||||
{ | |||||
CheckIsUsingEF(name); | |||||
var content = Helper.ToJson(contentObj); | |||||
PublishCore(name, content); | |||||
} | |||||
public Task PublishAsync<T>(string name, T contentObj) | |||||
{ | |||||
CheckIsUsingEF(name); | |||||
var content = Helper.ToJson(contentObj); | |||||
return PublishCoreAsync(name, content); | |||||
} | |||||
public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) | |||||
{ | |||||
CheckIsAdoNet(name); | |||||
if (dbConnection == null) | |||||
throw new ArgumentNullException(nameof(dbConnection)); | |||||
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
IsCapOpenedTrans = true; | |||||
PublishWithTrans(name, content, dbConnection, dbTransaction); | |||||
} | |||||
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) | |||||
{ | |||||
CheckIsAdoNet(name); | |||||
if (dbConnection == null) | |||||
throw new ArgumentNullException(nameof(dbConnection)); | |||||
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
IsCapOpenedTrans = true; | |||||
return PublishWithTransAsync(name, content, dbConnection, dbTransaction); | |||||
} | |||||
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) | |||||
{ | |||||
CheckIsAdoNet(name); | |||||
if (dbConnection == null) | |||||
throw new ArgumentNullException(nameof(dbConnection)); | |||||
var content = Helper.ToJson(contentObj); | |||||
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
PublishWithTrans(name, content, dbConnection, dbTransaction); | |||||
} | |||||
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) | |||||
{ | |||||
CheckIsAdoNet(name); | |||||
if (dbConnection == null) | |||||
throw new ArgumentNullException(nameof(dbConnection)); | |||||
var content = Helper.ToJson(contentObj); | |||||
dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
return PublishWithTransAsync(name, content, dbConnection, dbTransaction); | |||||
} | |||||
#region private methods | |||||
private void CheckIsUsingEF(string name) | |||||
{ | |||||
if (name == null) throw new ArgumentNullException(nameof(name)); | |||||
if (!IsUsingEF) | |||||
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + | |||||
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); | |||||
} | |||||
private void CheckIsAdoNet(string name) | |||||
{ | |||||
if (name == null) throw new ArgumentNullException(nameof(name)); | |||||
if (IsUsingEF) | |||||
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); | |||||
} | |||||
private async Task PublishCoreAsync(string name, string content) | |||||
{ | |||||
var connection = _dbContext.Database.GetDbConnection(); | |||||
var transaction = _dbContext.Database.CurrentTransaction; | |||||
IsCapOpenedTrans = transaction == null; | |||||
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); | |||||
var dbTransaction = transaction.GetDbTransaction(); | |||||
await PublishWithTransAsync(name, content, connection, dbTransaction); | |||||
} | |||||
private void PublishCore(string name, string content) | |||||
{ | |||||
var connection = _dbContext.Database.GetDbConnection(); | |||||
var transaction = _dbContext.Database.CurrentTransaction; | |||||
IsCapOpenedTrans = transaction == null; | |||||
transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
var dbTransaction = transaction.GetDbTransaction(); | |||||
PublishWithTrans(name, content, connection, dbTransaction); | |||||
} | |||||
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) | |||||
{ | |||||
var message = new CapPublishedMessage | |||||
{ | |||||
Name = name, | |||||
Content = content, | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); | |||||
_logger.LogInformation("Message has been persisted in the database. name:" + name); | |||||
if (IsCapOpenedTrans) | |||||
{ | |||||
dbTransaction.Commit(); | |||||
dbTransaction.Dispose(); | |||||
dbConnection.Dispose(); | |||||
} | |||||
PublishQueuer.PulseEvent.Set(); | |||||
} | |||||
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) | |||||
{ | |||||
var message = new CapPublishedMessage | |||||
{ | |||||
Name = name, | |||||
Content = content, | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); | |||||
_logger.LogInformation("Message has been persisted in the database. name:" + name); | |||||
if (IsCapOpenedTrans) | |||||
{ | |||||
dbTransaction.Commit(); | |||||
dbTransaction.Dispose(); | |||||
dbConnection.Dispose(); | |||||
} | |||||
PublishQueuer.PulseEvent.Set(); | |||||
} | |||||
private string PrepareSql() | |||||
{ | |||||
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; | |||||
} | |||||
#endregion private methods | |||||
} | |||||
} |
@@ -17,6 +17,7 @@ | |||||
<PackageReference Include="Dapper" Version="1.50.2" /> | <PackageReference Include="Dapper" Version="1.50.2" /> | ||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" /> | <PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" /> | ||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" /> | <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" /> | ||||
<PackageReference Include="MySqlConnector" Version="0.23.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -0,0 +1,11 @@ | |||||
using DotNetCore.CAP.Models; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class FetchedMessage | |||||
{ | |||||
public int MessageId { get; set; } | |||||
public MessageType MessageType { get; set; } | |||||
} | |||||
} |
@@ -0,0 +1,61 @@ | |||||
using System; | |||||
using System.Data.SqlClient; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Processor; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
{ | |||||
private readonly IServiceProvider _provider; | |||||
private readonly ILogger _logger; | |||||
private readonly MySqlOptions _options; | |||||
private const int MaxBatch = 1000; | |||||
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); | |||||
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2); | |||||
public DefaultAdditionalProcessor( | |||||
IServiceProvider provider, | |||||
ILogger<DefaultAdditionalProcessor> logger, | |||||
MySqlOptions sqlServerOptions) | |||||
{ | |||||
_logger = logger; | |||||
_provider = provider; | |||||
_options = sqlServerOptions; | |||||
} | |||||
public async Task ProcessAsync(ProcessingContext context) | |||||
{ | |||||
_logger.LogDebug("Collecting expired entities."); | |||||
var tables = new string[]{ | |||||
$"{_options.TableNamePrefix}.published", | |||||
$"{_options.TableNamePrefix}.received" | |||||
}; | |||||
foreach (var table in tables) | |||||
{ | |||||
var removedCount = 0; | |||||
do | |||||
{ | |||||
using (var connection = new SqlConnection(_options.ConnectionString)) | |||||
{ | |||||
removedCount = await connection.ExecuteAsync($@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;", | |||||
new { now = DateTime.Now, count = MaxBatch }); | |||||
} | |||||
if (removedCount != 0) | |||||
{ | |||||
await context.WaitAsync(_delay); | |||||
context.ThrowIfStopping(); | |||||
} | |||||
} while (removedCount != 0); | |||||
} | |||||
await context.WaitAsync(_waitingInterval); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,74 @@ | |||||
using System; | |||||
using System.Data; | |||||
using System.Threading; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Models; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class MySqlFetchedMessage : IFetchedMessage | |||||
{ | |||||
private readonly IDbConnection _connection; | |||||
private readonly IDbTransaction _transaction; | |||||
private readonly Timer _timer; | |||||
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); | |||||
private readonly object _lockObject = new object(); | |||||
public MySqlFetchedMessage(int messageId, | |||||
MessageType type, | |||||
IDbConnection connection, | |||||
IDbTransaction transaction) | |||||
{ | |||||
MessageId = messageId; | |||||
MessageType = type; | |||||
_connection = connection; | |||||
_transaction = transaction; | |||||
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); | |||||
} | |||||
public int MessageId { get; } | |||||
public MessageType MessageType { get; } | |||||
public void RemoveFromQueue() | |||||
{ | |||||
lock (_lockObject) | |||||
{ | |||||
_transaction.Commit(); | |||||
} | |||||
} | |||||
public void Requeue() | |||||
{ | |||||
lock (_lockObject) | |||||
{ | |||||
_transaction.Rollback(); | |||||
} | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
lock (_lockObject) | |||||
{ | |||||
_timer?.Dispose(); | |||||
_transaction.Dispose(); | |||||
_connection.Dispose(); | |||||
} | |||||
} | |||||
private void ExecuteKeepAliveQuery(object obj) | |||||
{ | |||||
lock (_lockObject) | |||||
{ | |||||
try | |||||
{ | |||||
_connection?.Execute("SELECT 1", _transaction); | |||||
} | |||||
catch | |||||
{ | |||||
// ignored | |||||
} | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,66 @@ | |||||
using System.Data.SqlClient; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using Microsoft.Extensions.Logging; | |||||
using MySql.Data.MySqlClient; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class MySqlStorage : IStorage | |||||
{ | |||||
private readonly MySqlOptions _options; | |||||
private readonly ILogger _logger; | |||||
public MySqlStorage(ILogger<MySqlStorage> logger, MySqlOptions options) | |||||
{ | |||||
_options = options; | |||||
_logger = logger; | |||||
} | |||||
public async Task InitializeAsync(CancellationToken cancellationToken) | |||||
{ | |||||
if (cancellationToken.IsCancellationRequested) return; | |||||
var sql = CreateDbTablesScript(_options.TableNamePrefix); | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
await connection.ExecuteAsync(sql); | |||||
} | |||||
_logger.LogDebug("Ensuring all create database tables script are applied."); | |||||
} | |||||
protected virtual string CreateDbTablesScript(string prefix) | |||||
{ | |||||
var batchSql = | |||||
$@" | |||||
CREATE TABLE IF NOT EXISTS `{prefix}.queue` ( | |||||
`MessageId` int(11) NOT NULL, | |||||
`MessageType` tinyint(4) NOT NULL | |||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |||||
CREATE TABLE IF NOT EXISTS `{prefix}.received` ( | |||||
`Id` int(127) NOT NULL AUTO_INCREMENT, | |||||
`Name` varchar(400) NOT NULL, | |||||
`Group` varchar(200) DEFAULT NULL, | |||||
`Content` longtext, | |||||
`Retries` int(11) DEFAULT NULL, | |||||
`Added` datetime(6) NOT NULL, | |||||
`ExpiresAt` datetime(6) DEFAULT NULL, | |||||
`StatusName` varchar(50) NOT NULL, | |||||
PRIMARY KEY (`Id`) | |||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |||||
CREATE TABLE IF NOT EXISTS `{prefix}.published` ( | |||||
`Id` int(127) NOT NULL AUTO_INCREMENT, | |||||
`Name` varchar(200) NOT NULL, | |||||
`Content` longtext, | |||||
`Retries` int(11) DEFAULT NULL, | |||||
`Added` datetime(6) NOT NULL, | |||||
`ExpiresAt` datetime(6) DEFAULT NULL, | |||||
`StatusName` varchar(40) NOT NULL, | |||||
PRIMARY KEY (`Id`) | |||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;"; | |||||
return batchSql; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,152 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Data; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using MySql.Data.MySqlClient; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class MySqlStorageConnection : IStorageConnection | |||||
{ | |||||
private readonly MySqlOptions _options; | |||||
private readonly string _prefix; | |||||
public MySqlStorageConnection(MySqlOptions options) | |||||
{ | |||||
_options = options; | |||||
_prefix = _options.TableNamePrefix; | |||||
} | |||||
public MySqlOptions Options => _options; | |||||
public IStorageTransaction CreateTransaction() | |||||
{ | |||||
return new MySqlStorageTransaction(this); | |||||
} | |||||
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id) | |||||
{ | |||||
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql); | |||||
} | |||||
} | |||||
public Task<IFetchedMessage> FetchNextMessageAsync() | |||||
{ | |||||
//Last execute statement : | |||||
//SET TRANSACTION ISOLATION LEVEL READ COMMITTED; | |||||
//START TRANSACTION; | |||||
//SELECT MessageId,MessageType FROM [{_prefix}].[Queue] LIMIT 1; | |||||
//DELETE FROM [{_prefix}].[Queue] LIMIT 1; | |||||
//COMMIT; | |||||
var sql = $@" | |||||
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1; | |||||
DELETE FROM `{_prefix}.queue` LIMIT 1;"; | |||||
return FetchNextMessageCoreAsync(sql); | |||||
} | |||||
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync() | |||||
{ | |||||
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql); | |||||
} | |||||
} | |||||
public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() | |||||
{ | |||||
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryAsync<CapPublishedMessage>(sql); | |||||
} | |||||
} | |||||
// CapReceviedMessage | |||||
public async Task StoreReceivedMessageAsync(CapReceivedMessage message) | |||||
{ | |||||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||||
var sql = $@" | |||||
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) | |||||
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
await connection.ExecuteAsync(sql, message); | |||||
} | |||||
} | |||||
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id) | |||||
{ | |||||
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); | |||||
} | |||||
} | |||||
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync() | |||||
{ | |||||
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); | |||||
} | |||||
} | |||||
public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages() | |||||
{ | |||||
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';"; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
{ | |||||
return await connection.QueryAsync<CapReceivedMessage>(sql); | |||||
} | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
} | |||||
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) | |||||
{ | |||||
//here don't use `using` to dispose | |||||
var connection = new MySqlConnection(_options.ConnectionString); | |||||
await connection.OpenAsync(); | |||||
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
FetchedMessage fetchedMessage = null; | |||||
try | |||||
{ | |||||
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction); | |||||
} | |||||
catch (MySqlException) | |||||
{ | |||||
transaction.Dispose(); | |||||
throw; | |||||
} | |||||
if (fetchedMessage == null) | |||||
{ | |||||
transaction.Rollback(); | |||||
transaction.Dispose(); | |||||
connection.Dispose(); | |||||
return null; | |||||
} | |||||
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,71 @@ | |||||
using System; | |||||
using System.Data; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Models; | |||||
using MySql.Data.MySqlClient; | |||||
namespace DotNetCore.CAP.MySql | |||||
{ | |||||
public class MySqlStorageTransaction : IStorageTransaction, IDisposable | |||||
{ | |||||
private readonly string _prefix; | |||||
private readonly IDbTransaction _dbTransaction; | |||||
private readonly IDbConnection _dbConnection; | |||||
public MySqlStorageTransaction(MySqlStorageConnection connection) | |||||
{ | |||||
var options = connection.Options; | |||||
_prefix = options.TableNamePrefix; | |||||
_dbConnection = new MySqlConnection(options.ConnectionString); | |||||
_dbConnection.Open(); | |||||
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); | |||||
} | |||||
public void UpdateMessage(CapPublishedMessage message) | |||||
{ | |||||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||||
var sql = $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; | |||||
_dbConnection.Execute(sql, message, _dbTransaction); | |||||
} | |||||
public void UpdateMessage(CapReceivedMessage message) | |||||
{ | |||||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||||
var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; | |||||
_dbConnection.Execute(sql, message, _dbTransaction); | |||||
} | |||||
public void EnqueueMessage(CapPublishedMessage message) | |||||
{ | |||||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||||
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; | |||||
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction); | |||||
} | |||||
public void EnqueueMessage(CapReceivedMessage message) | |||||
{ | |||||
if (message == null) throw new ArgumentNullException(nameof(message)); | |||||
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; | |||||
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction); | |||||
} | |||||
public Task CommitAsync() | |||||
{ | |||||
_dbTransaction.Commit(); | |||||
return Task.CompletedTask; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
_dbTransaction.Dispose(); | |||||
_dbConnection.Dispose(); | |||||
} | |||||
} | |||||
} |