diff --git a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs new file mode 100644 index 0000000..e02b574 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs @@ -0,0 +1,13 @@ +using System; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class EFOptions + { + /// + /// EF dbcontext type. + /// + public Type DbContextType { get; internal set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs new file mode 100644 index 0000000..f03584e --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs @@ -0,0 +1,45 @@ +using System; +using DotNetCore.CAP.Processor; +using DotNetCore.CAP.MySql; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class MySqlCapOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public MySqlCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddTransient(); + + var sqlServerOptions = new MySqlOptions(); + _configure(sqlServerOptions); + + var provider = TempBuildService(services); + var dbContextObj = provider.GetService(sqlServerOptions.DbContextType); + if (dbContextObj != null) + { + var dbContext = (DbContext)dbContextObj; + sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; + } + services.Configure(_configure); + services.AddSingleton(sqlServerOptions); + } + + private IServiceProvider TempBuildService(IServiceCollection services) + { + return services.BuildServiceProvider(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs new file mode 100644 index 0000000..7ad3801 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs @@ -0,0 +1,13 @@ +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class MySqlOptions : EFOptions + { + /// + /// Gets or sets the database's connection string that will be used to store database entities. + /// + public string ConnectionString { get; set; } + + public string TableNamePrefix { get; set; } = "cap"; + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs new file mode 100644 index 0000000..f838262 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs @@ -0,0 +1,49 @@ +using System; +using DotNetCore.CAP; +using Microsoft.EntityFrameworkCore; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UseMySql(this CapOptions options, string connectionString) + { + return options.UseMySql(opt => + { + opt.ConnectionString = connectionString; + }); + } + + public static CapOptions UseMySql(this CapOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new MySqlCapOptionsExtension(configure)); + + return options; + } + + public static CapOptions UseEntityFramework(this CapOptions options) + where TContext : DbContext + { + return options.UseEntityFramework(opt => + { + opt.DbContextType = typeof(TContext); + }); + } + + public static CapOptions UseEntityFramework(this CapOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var efOptions = new EFOptions { DbContextType = typeof(TContext) }; + configure(efOptions); + + options.RegisterExtension(new MySqlCapOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CapPublisher.cs b/src/DotNetCore.CAP.MySql/CapPublisher.cs new file mode 100644 index 0000000..92959d8 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CapPublisher.cs @@ -0,0 +1,214 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.MySql +{ + public class CapPublisher : ICapPublisher + { + private readonly ILogger _logger; + private readonly MySqlOptions _options; + private readonly DbContext _dbContext; + + protected bool IsCapOpenedTrans { get; set; } + + protected bool IsUsingEF { get; } + + protected IServiceProvider ServiceProvider { get; } + + public CapPublisher(IServiceProvider provider, + ILogger logger, + MySqlOptions options) + { + ServiceProvider = provider; + _logger = logger; + _options = options; + + if (_options.DbContextType != null) + { + IsUsingEF = true; + _dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); + } + } + + public void Publish(string name, string content) + { + CheckIsUsingEF(name); + + PublishCore(name, content); + } + + public Task PublishAsync(string name, string content) + { + CheckIsUsingEF(name); + + return PublishCoreAsync(name, content); + } + + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Helper.ToJson(contentObj); + + PublishCore(name, content); + } + + public Task PublishAsync(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Helper.ToJson(contentObj); + + return PublishCoreAsync(name, content); + } + + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + #region private methods + + private void CheckIsUsingEF(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } + + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); + } + + private async Task PublishCoreAsync(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; + transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + await PublishWithTransAsync(name, content, connection, dbTransaction); + } + + private void PublishCore(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; + transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + PublishWithTrans(name, content, connection, dbTransaction); + } + + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + + PublishQueuer.PulseEvent.Set(); + } + + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + PublishQueuer.PulseEvent.Set(); + } + + private string PrepareSql() + { + return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + + #endregion private methods + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj index a1c8bc3..f9f2c51 100644 --- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj +++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj @@ -17,6 +17,7 @@ + diff --git a/src/DotNetCore.CAP.MySql/FetchedMessage.cs b/src/DotNetCore.CAP.MySql/FetchedMessage.cs new file mode 100644 index 0000000..c14c549 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/FetchedMessage.cs @@ -0,0 +1,11 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.MySql +{ + public class FetchedMessage + { + public int MessageId { get; set; } + + public MessageType MessageType { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs new file mode 100644 index 0000000..335e4c7 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs @@ -0,0 +1,61 @@ +using System; +using System.Data.SqlClient; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.MySql +{ + public class DefaultAdditionalProcessor : IAdditionalProcessor + { + private readonly IServiceProvider _provider; + private readonly ILogger _logger; + private readonly MySqlOptions _options; + + private const int MaxBatch = 1000; + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2); + + public DefaultAdditionalProcessor( + IServiceProvider provider, + ILogger logger, + MySqlOptions sqlServerOptions) + { + _logger = logger; + _provider = provider; + _options = sqlServerOptions; + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug("Collecting expired entities."); + + var tables = new string[]{ + $"{_options.TableNamePrefix}.published", + $"{_options.TableNamePrefix}.received" + }; + + foreach (var table in tables) + { + var removedCount = 0; + do + { + using (var connection = new SqlConnection(_options.ConnectionString)) + { + removedCount = await connection.ExecuteAsync($@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;", + new { now = DateTime.Now, count = MaxBatch }); + } + + if (removedCount != 0) + { + await context.WaitAsync(_delay); + context.ThrowIfStopping(); + } + } while (removedCount != 0); + } + + await context.WaitAsync(_waitingInterval); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs new file mode 100644 index 0000000..f9a4ce1 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs @@ -0,0 +1,74 @@ +using System; +using System.Data; +using System.Threading; +using Dapper; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlFetchedMessage : IFetchedMessage + { + private readonly IDbConnection _connection; + private readonly IDbTransaction _transaction; + private readonly Timer _timer; + private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); + private readonly object _lockObject = new object(); + + public MySqlFetchedMessage(int messageId, + MessageType type, + IDbConnection connection, + IDbTransaction transaction) + { + MessageId = messageId; + MessageType = type; + _connection = connection; + _transaction = transaction; + _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); + } + + public int MessageId { get; } + + public MessageType MessageType { get; } + + public void RemoveFromQueue() + { + lock (_lockObject) + { + _transaction.Commit(); + } + } + + public void Requeue() + { + lock (_lockObject) + { + _transaction.Rollback(); + } + } + + public void Dispose() + { + lock (_lockObject) + { + _timer?.Dispose(); + _transaction.Dispose(); + _connection.Dispose(); + } + } + + private void ExecuteKeepAliveQuery(object obj) + { + lock (_lockObject) + { + try + { + _connection?.Execute("SELECT 1", _transaction); + } + catch + { + // ignored + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorage.cs b/src/DotNetCore.CAP.MySql/MySqlStorage.cs new file mode 100644 index 0000000..0db4fda --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorage.cs @@ -0,0 +1,66 @@ +using System.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using Microsoft.Extensions.Logging; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorage : IStorage + { + private readonly MySqlOptions _options; + private readonly ILogger _logger; + + public MySqlStorage(ILogger logger, MySqlOptions options) + { + _options = options; + _logger = logger; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + var sql = CreateDbTablesScript(_options.TableNamePrefix); + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + _logger.LogDebug("Ensuring all create database tables script are applied."); + } + + protected virtual string CreateDbTablesScript(string prefix) + { + var batchSql = + $@" +CREATE TABLE IF NOT EXISTS `{prefix}.queue` ( + `MessageId` int(11) NOT NULL, + `MessageType` tinyint(4) NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `{prefix}.received` ( + `Id` int(127) NOT NULL AUTO_INCREMENT, + `Name` varchar(400) NOT NULL, + `Group` varchar(200) DEFAULT NULL, + `Content` longtext, + `Retries` int(11) DEFAULT NULL, + `Added` datetime(6) NOT NULL, + `ExpiresAt` datetime(6) DEFAULT NULL, + `StatusName` varchar(50) NOT NULL, + PRIMARY KEY (`Id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `{prefix}.published` ( + `Id` int(127) NOT NULL AUTO_INCREMENT, + `Name` varchar(200) NOT NULL, + `Content` longtext, + `Retries` int(11) DEFAULT NULL, + `Added` datetime(6) NOT NULL, + `ExpiresAt` datetime(6) DEFAULT NULL, + `StatusName` varchar(40) NOT NULL, + PRIMARY KEY (`Id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8;"; + return batchSql; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs new file mode 100644 index 0000000..7e78976 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorageConnection : IStorageConnection + { + private readonly MySqlOptions _options; + private readonly string _prefix; + + public MySqlStorageConnection(MySqlOptions options) + { + _options = options; + _prefix = _options.TableNamePrefix; + } + + public MySqlOptions Options => _options; + + public IStorageTransaction CreateTransaction() + { + return new MySqlStorageTransaction(this); + } + + public async Task GetPublishedMessageAsync(int id) + { + var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public Task FetchNextMessageAsync() + { + //Last execute statement : + + //SET TRANSACTION ISOLATION LEVEL READ COMMITTED; + //START TRANSACTION; + //SELECT MessageId,MessageType FROM [{_prefix}].[Queue] LIMIT 1; + //DELETE FROM [{_prefix}].[Queue] LIMIT 1; + //COMMIT; + + var sql = $@" +SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1; +DELETE FROM `{_prefix}.queue` LIMIT 1;"; + + return FetchNextMessageCoreAsync(sql); + } + + public async Task GetNextPublishedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + // CapReceviedMessage + + public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $@" +INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) +VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql, message); + } + } + + public async Task GetReceivedMessageAsync(int id) + { + var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task GetNextReceviedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + public void Dispose() + { + } + + private async Task FetchNextMessageCoreAsync(string sql, object args = null) + { + //here don't use `using` to dispose + var connection = new MySqlConnection(_options.ConnectionString); + await connection.OpenAsync(); + var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); + FetchedMessage fetchedMessage = null; + try + { + fetchedMessage = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); + } + catch (MySqlException) + { + transaction.Dispose(); + throw; + } + + if (fetchedMessage == null) + { + transaction.Rollback(); + transaction.Dispose(); + connection.Dispose(); + return null; + } + + return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs new file mode 100644 index 0000000..d93be89 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs @@ -0,0 +1,71 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Models; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorageTransaction : IStorageTransaction, IDisposable + { + private readonly string _prefix; + + private readonly IDbTransaction _dbTransaction; + private readonly IDbConnection _dbConnection; + + public MySqlStorageTransaction(MySqlStorageConnection connection) + { + var options = connection.Options; + _prefix = options.TableNamePrefix; + + _dbConnection = new MySqlConnection(options.ConnectionString); + _dbConnection.Open(); + _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } + + public void UpdateMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void UpdateMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void EnqueueMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction); + } + + public void EnqueueMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction); + } + + public Task CommitAsync() + { + _dbTransaction.Commit(); + return Task.CompletedTask; + } + + public void Dispose() + { + _dbTransaction.Dispose(); + _dbConnection.Dispose(); + } + } +} \ No newline at end of file