@@ -42,7 +42,7 @@ namespace DotNetCore.CAP | |||||
{ | { | ||||
throw new ArgumentNullException(nameof(Servers)); | throw new ArgumentNullException(nameof(Servers)); | ||||
} | } | ||||
MainConfig.Add("bootstrap.servers", Servers); | MainConfig.Add("bootstrap.servers", Servers); | ||||
MainConfig["queue.buffering.max.ms"] = "10"; | MainConfig["queue.buffering.max.ms"] = "10"; | ||||
@@ -1,7 +1,4 @@ | |||||
using System; | |||||
using Microsoft.Extensions.Options; | |||||
namespace DotNetCore.CAP.Kafka | |||||
namespace DotNetCore.CAP.Kafka | |||||
{ | { | ||||
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory | internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory | ||||
{ | { | ||||
@@ -34,7 +34,7 @@ namespace DotNetCore.CAP | |||||
var dbContext = (DbContext)x.GetService(mysqlOptions.DbContextType); | var dbContext = (DbContext)x.GetService(mysqlOptions.DbContextType); | ||||
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | ||||
return mysqlOptions; | return mysqlOptions; | ||||
}); | |||||
}); | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
@@ -37,7 +37,7 @@ namespace DotNetCore.CAP.MySql | |||||
DbConnection = _dbContext.Database.GetDbConnection(); | DbConnection = _dbContext.Database.GetDbConnection(); | ||||
var dbContextTransaction = _dbContext.Database.CurrentTransaction; | var dbContextTransaction = _dbContext.Database.CurrentTransaction; | ||||
var dbTrans = dbContextTransaction?.GetDbTransaction(); | var dbTrans = dbContextTransaction?.GetDbTransaction(); | ||||
//DbTransaction is dispose in original | |||||
//DbTransaction is dispose in original | |||||
if (dbTrans?.Connection == null) | if (dbTrans?.Connection == null) | ||||
{ | { | ||||
IsCapOpenedTrans = true; | IsCapOpenedTrans = true; | ||||
@@ -58,7 +58,7 @@ namespace DotNetCore.CAP.MySql | |||||
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | ||||
{ | { | ||||
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); | await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); | ||||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | ||||
} | } | ||||
@@ -69,7 +69,8 @@ namespace DotNetCore.CAP.MySql | |||||
await conn.ExecuteAsync(PrepareSql(), message); | await conn.ExecuteAsync(PrepareSql(), message); | ||||
} | } | ||||
} | } | ||||
#region private methods | |||||
#region private methods | |||||
private string PrepareSql() | private string PrepareSql() | ||||
{ | { | ||||
@@ -1,6 +1,6 @@ | |||||
using System; | using System; | ||||
using DotNetCore.CAP.Processor; | |||||
using DotNetCore.CAP.PostgreSql; | using DotNetCore.CAP.PostgreSql; | ||||
using DotNetCore.CAP.Processor; | |||||
using Microsoft.EntityFrameworkCore; | using Microsoft.EntityFrameworkCore; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
@@ -2,8 +2,8 @@ | |||||
using System.Data; | using System.Data; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using Dapper; | using Dapper; | ||||
using DotNetCore.CAP.Models; | |||||
using DotNetCore.CAP.Abstractions; | using DotNetCore.CAP.Abstractions; | ||||
using DotNetCore.CAP.Models; | |||||
using Microsoft.EntityFrameworkCore; | using Microsoft.EntityFrameworkCore; | ||||
using Microsoft.EntityFrameworkCore.Storage; | using Microsoft.EntityFrameworkCore.Storage; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
if (_options.DbContextType != null) | if (_options.DbContextType != null) | ||||
{ | { | ||||
IsUsingEF = true; | IsUsingEF = true; | ||||
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); | |||||
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); | |||||
} | } | ||||
} | } | ||||
@@ -37,7 +37,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
DbConnection = _dbContext.Database.GetDbConnection(); | DbConnection = _dbContext.Database.GetDbConnection(); | ||||
var dbContextTransaction = _dbContext.Database.CurrentTransaction; | var dbContextTransaction = _dbContext.Database.CurrentTransaction; | ||||
var dbTrans = dbContextTransaction?.GetDbTransaction(); | var dbTrans = dbContextTransaction?.GetDbTransaction(); | ||||
//DbTransaction is dispose in original | |||||
//DbTransaction is dispose in original | |||||
if (dbTrans?.Connection == null) | if (dbTrans?.Connection == null) | ||||
{ | { | ||||
IsCapOpenedTrans = true; | IsCapOpenedTrans = true; | ||||
@@ -75,8 +75,8 @@ namespace DotNetCore.CAP.PostgreSql | |||||
private string PrepareSql() | private string PrepareSql() | ||||
{ | { | ||||
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; | return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; | ||||
} | |||||
} | |||||
#endregion | |||||
#endregion private methods | |||||
} | } | ||||
} | } |
@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
using (var connection = new NpgsqlConnection(_options.ConnectionString)) | using (var connection = new NpgsqlConnection(_options.ConnectionString)) | ||||
{ | { | ||||
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);", | removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);", | ||||
new { now = DateTime.Now, count = MaxBatch }); | |||||
new { now = DateTime.Now, count = MaxBatch }); | |||||
} | } | ||||
if (removedCount != 0) | if (removedCount != 0) | ||||
@@ -37,7 +37,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
DbConnection = _dbContext.Database.GetDbConnection(); | DbConnection = _dbContext.Database.GetDbConnection(); | ||||
var dbContextTransaction = _dbContext.Database.CurrentTransaction; | var dbContextTransaction = _dbContext.Database.CurrentTransaction; | ||||
var dbTrans = dbContextTransaction?.GetDbTransaction(); | var dbTrans = dbContextTransaction?.GetDbTransaction(); | ||||
//DbTransaction is dispose in original | |||||
//DbTransaction is dispose in original | |||||
if (dbTrans?.Connection == null) | if (dbTrans?.Connection == null) | ||||
{ | { | ||||
IsCapOpenedTrans = true; | IsCapOpenedTrans = true; | ||||
@@ -66,8 +66,8 @@ namespace DotNetCore.CAP.SqlServer | |||||
{ | { | ||||
using (var conn = new SqlConnection(_options.ConnectionString)) | using (var conn = new SqlConnection(_options.ConnectionString)) | ||||
{ | { | ||||
await conn.ExecuteAsync(PrepareSql(), message); | |||||
} | |||||
await conn.ExecuteAsync(PrepareSql(), message); | |||||
} | |||||
} | } | ||||
#region private methods | #region private methods | ||||
@@ -160,4 +160,4 @@ namespace DotNetCore.CAP.Abstractions | |||||
#endregion private methods | #endregion private methods | ||||
} | } | ||||
} | |||||
} |
@@ -16,12 +16,14 @@ namespace DotNetCore.CAP | |||||
{ | { | ||||
// private static fields | // private static fields | ||||
private static readonly DateTime __unixEpoch; | private static readonly DateTime __unixEpoch; | ||||
private static readonly long __dateTimeMaxValueMillisecondsSinceEpoch; | private static readonly long __dateTimeMaxValueMillisecondsSinceEpoch; | ||||
private static readonly long __dateTimeMinValueMillisecondsSinceEpoch; | private static readonly long __dateTimeMinValueMillisecondsSinceEpoch; | ||||
private static ObjectId __emptyInstance = default(ObjectId); | private static ObjectId __emptyInstance = default(ObjectId); | ||||
private static int __staticMachine; | private static int __staticMachine; | ||||
private static short __staticPid; | private static short __staticPid; | ||||
private static int __staticIncrement; // high byte will be masked out when generating new ObjectId | private static int __staticIncrement; // high byte will be masked out when generating new ObjectId | ||||
private static uint[] _lookup32 = Enumerable.Range(0, 256).Select(i => | private static uint[] _lookup32 = Enumerable.Range(0, 256).Select(i => | ||||
{ | { | ||||
string s = i.ToString("x2"); | string s = i.ToString("x2"); | ||||
@@ -32,6 +34,7 @@ namespace DotNetCore.CAP | |||||
// the extra two bytes are not visible to anyone outside of this class and they buy us considerable simplification | // the extra two bytes are not visible to anyone outside of this class and they buy us considerable simplification | ||||
// an additional advantage of this representation is that it will serialize to JSON without any 64 bit overflow problems | // an additional advantage of this representation is that it will serialize to JSON without any 64 bit overflow problems | ||||
private int _timestamp; | private int _timestamp; | ||||
private int _machine; | private int _machine; | ||||
private short _pid; | private short _pid; | ||||
private int _increment; | private int _increment; | ||||
@@ -475,6 +478,7 @@ namespace DotNetCore.CAP | |||||
return arr; | return arr; | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// Converts a byte array to a hex string. | /// Converts a byte array to a hex string. | ||||
/// </summary> | /// </summary> | ||||
@@ -495,6 +499,7 @@ namespace DotNetCore.CAP | |||||
} | } | ||||
return new string(result); | return new string(result); | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// Converts a DateTime to number of milliseconds since Unix epoch. | /// Converts a DateTime to number of milliseconds since Unix epoch. | ||||
/// </summary> | /// </summary> | ||||
@@ -505,6 +510,7 @@ namespace DotNetCore.CAP | |||||
var utcDateTime = ToUniversalTime(dateTime); | var utcDateTime = ToUniversalTime(dateTime); | ||||
return (utcDateTime - __unixEpoch).Ticks / 10000; | return (utcDateTime - __unixEpoch).Ticks / 10000; | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// Converts a DateTime to UTC (with special handling for MinValue and MaxValue). | /// Converts a DateTime to UTC (with special handling for MinValue and MaxValue). | ||||
/// </summary> | /// </summary> | ||||
@@ -537,4 +543,4 @@ namespace DotNetCore.CAP | |||||
return val - (val < 58 ? 48 : (val < 97 ? 55 : 87)); | return val - (val < 58 ? 48 : (val < 97 ? 55 : 87)); | ||||
} | } | ||||
} | } | ||||
} | |||||
} |
@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Internal | |||||
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) | public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) | ||||
{ | { | ||||
using(var scope = _serviceProvider.CreateScope()) | |||||
using (var scope = _serviceProvider.CreateScope()) | |||||
{ | { | ||||
var context = new ConsumerInvokerContext(consumerContext) | var context = new ConsumerInvokerContext(consumerContext) | ||||
{ | { | ||||
@@ -3,7 +3,6 @@ using System.Reflection; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Abstractions.ModelBinding; | using DotNetCore.CAP.Abstractions.ModelBinding; | ||||
using DotNetCore.CAP.Infrastructure; | using DotNetCore.CAP.Infrastructure; | ||||
using DotNetCore.CAP.Models; | |||||
namespace DotNetCore.CAP.Internal | namespace DotNetCore.CAP.Internal | ||||
{ | { | ||||
@@ -5,8 +5,16 @@ namespace DotNetCore.CAP.Internal | |||||
[Serializable] | [Serializable] | ||||
public class MethodBindException : Exception | public class MethodBindException : Exception | ||||
{ | { | ||||
public MethodBindException() { } | |||||
public MethodBindException(string message) : base(message) { } | |||||
public MethodBindException(string message, Exception inner) : base(message, inner) { } | |||||
public MethodBindException() | |||||
{ | |||||
} | |||||
public MethodBindException(string message) : base(message) | |||||
{ | |||||
} | |||||
public MethodBindException(string message, Exception inner) : base(message, inner) | |||||
{ | |||||
} | |||||
} | } | ||||
} | |||||
} |
@@ -23,4 +23,4 @@ namespace DotNetCore.CAP.Models | |||||
Content = content; | Content = content; | ||||
} | } | ||||
} | } | ||||
} | |||||
} |