diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
index 96f0191..3f8bd18 100644
--- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
+++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
@@ -9,7 +9,7 @@
- NU1605
+ NU1605;NU1701
NU1701
diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
index 2eeddc1..2fd661a 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
@@ -64,18 +64,19 @@ namespace DotNetCore.CAP.Abstractions
protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);
- #region private methods
-
- private string Serialize(T obj, string callbackName = null)
+ protected virtual string Serialize(T obj, string callbackName = null)
{
- var message = new Message(obj)
+ var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
+
+ var message = new CapMessageDto(obj)
{
CallbackName = callbackName
};
-
- return Helper.ToJson(message);
+ return serializer.Serialize(message);
}
+ #region private methods
+
private void PrepareConnectionForAdo(IDbConnection dbConnection, IDbTransaction dbTransaction)
{
DbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
diff --git a/src/DotNetCore.CAP/Abstractions/IContentSerializer.cs b/src/DotNetCore.CAP/Abstractions/IContentSerializer.cs
new file mode 100644
index 0000000..3e8e2b7
--- /dev/null
+++ b/src/DotNetCore.CAP/Abstractions/IContentSerializer.cs
@@ -0,0 +1,11 @@
+using DotNetCore.CAP.Models;
+
+namespace DotNetCore.CAP.Abstractions
+{
+ public interface IContentSerializer
+ {
+ string Serialize(T obj) where T : CapMessageDto, new();
+
+ T DeSerialize(string content) where T : CapMessageDto, new();
+ }
+}
diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
index 51f645b..b3b16aa 100644
--- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
+++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
@@ -33,6 +33,7 @@ namespace Microsoft.Extensions.DependencyInjection
AddSubscribeServices(services);
+ services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
@@ -42,7 +43,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
-
+
//Processors
services.AddTransient();
services.AddTransient();
diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
index 0f408be..330f6ac 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
@@ -35,6 +35,7 @@ namespace DotNetCore.CAP.Internal
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
+ var serializer = _serviceProvider.GetService();
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
@@ -42,7 +43,7 @@ namespace DotNetCore.CAP.Internal
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType);
var jsonConent = _consumerContext.DeliverMessage.Content;
- var message = Helper.FromJson(jsonConent);
+ var message = serializer.DeSerialize(jsonConent);
object result = null;
if (_executor.MethodParameters.Length > 0)
@@ -105,7 +106,7 @@ namespace DotNetCore.CAP.Internal
private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj)
{
- var callbackMessage = new Message
+ var callbackMessage = new CapMessageDto
{
Id = messageId,
Content = bodyObj
diff --git a/src/DotNetCore.CAP/Internal/IContentSerializer.Json.cs b/src/DotNetCore.CAP/Internal/IContentSerializer.Json.cs
new file mode 100644
index 0000000..1f85cee
--- /dev/null
+++ b/src/DotNetCore.CAP/Internal/IContentSerializer.Json.cs
@@ -0,0 +1,19 @@
+using DotNetCore.CAP.Abstractions;
+using DotNetCore.CAP.Infrastructure;
+using DotNetCore.CAP.Models;
+
+namespace DotNetCore.CAP.Internal
+{
+ public class JsonContentSerializer : IContentSerializer
+ {
+ public T DeSerialize(string messageObjStr) where T : CapMessageDto, new() {
+
+ return Helper.FromJson(messageObjStr);
+ }
+
+ public string Serialize(T messageObj) where T : CapMessageDto, new() {
+
+ return Helper.ToJson(messageObj);
+ }
+ }
+}