Browse Source

Solve the issue of being restricted to using Newtonsoft (#664)

* Solve the issue of being restricted to using Newtonsoft for serialization/deserialization.

* Removed whitespace

* Failed build fixed by injecting ISerializer.

* Removed unintended reference.

Co-authored-by: Patrick Heemskerk <pheemskerk@inforit.nl>
master
patheems 4 years ago
committed by GitHub
parent
commit
ba31886db7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 188 additions and 43 deletions
  1. +2
    -1
      src/DotNetCore.CAP.Dashboard/CAP.DashboardOptionsExtensions.cs
  2. +7
    -7
      src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs
  3. +7
    -5
      src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
  4. +8
    -6
      src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
  5. +7
    -4
      src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
  6. +7
    -4
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  7. +7
    -4
      src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
  8. +1
    -1
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  9. +6
    -4
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  10. +8
    -2
      src/DotNetCore.CAP/Messages/Message.cs
  11. +27
    -2
      src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
  12. +31
    -1
      src/DotNetCore.CAP/Serialization/ISerializer.cs
  13. +3
    -1
      test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs
  14. +3
    -1
      test/DotNetCore.CAP.MySql.Test/TestHost.cs
  15. +62
    -0
      test/DotNetCore.CAP.Test/MessageTest.cs
  16. +2
    -0
      test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs

+ 2
- 1
src/DotNetCore.CAP.Dashboard/CAP.DashboardOptionsExtensions.cs View File

@@ -6,6 +6,7 @@ using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.GatewayProxy; using DotNetCore.CAP.Dashboard.GatewayProxy;
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester; using DotNetCore.CAP.Dashboard.GatewayProxy.Requester;
using DotNetCore.CAP.Serialization;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;


@@ -26,7 +27,7 @@ namespace DotNetCore.CAP
_options?.Invoke(dashboardOptions); _options?.Invoke(dashboardOptions);
services.AddTransient<IStartupFilter, CapStartupFilter>(); services.AddTransient<IStartupFilter, CapStartupFilter>();
services.AddSingleton(dashboardOptions); services.AddSingleton(dashboardOptions);
services.AddSingleton(DashboardRoutes.Routes);
services.AddSingleton(x => DashboardRoutes.GetDashboardRoutes(x.GetRequiredService<ISerializer>()));
services.AddSingleton<IHttpRequester, HttpClientHttpRequester>(); services.AddSingleton<IHttpRequester, HttpClientHttpRequester>();
services.AddSingleton<IHttpClientCache, MemoryHttpClientCache>(); services.AddSingleton<IHttpClientCache, MemoryHttpClientCache>();
services.AddSingleton<IRequestMapper, RequestMapper>(); services.AddSingleton<IRequestMapper, RequestMapper>();


+ 7
- 7
src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs View File

@@ -10,7 +10,7 @@ using Microsoft.Extensions.DependencyInjection;


namespace DotNetCore.CAP.Dashboard namespace DotNetCore.CAP.Dashboard
{ {
public static class DashboardRoutes
public class DashboardRoutes
{ {
private static readonly string[] Javascripts = private static readonly string[] Javascripts =
{ {
@@ -33,9 +33,9 @@ namespace DotNetCore.CAP.Dashboard
"cap.css" "cap.css"
}; };


static DashboardRoutes()
public static RouteCollection GetDashboardRoutes(ISerializer serializer)
{ {
Routes = new RouteCollection();
RouteCollection Routes = new RouteCollection();
Routes.AddRazorPage("/", x => new HomePage()); Routes.AddRazorPage("/", x => new HomePage());
Routes.Add("/stats", new JsonStats()); Routes.Add("/stats", new JsonStats());
Routes.Add("/health", new OkStats()); Routes.Add("/health", new OkStats());
@@ -104,7 +104,7 @@ namespace DotNetCore.CAP.Dashboard
{ {
var msg = client.Storage.GetMonitoringApi().GetPublishedMessageAsync(messageId) var msg = client.Storage.GetMonitoringApi().GetPublishedMessageAsync(messageId)
.GetAwaiter().GetResult(); .GetAwaiter().GetResult();
msg.Origin = StringSerializer.DeSerialize(msg.Content);
msg.Origin = serializer.Deserialize(msg.Content);
client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg); client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg);
}); });
Routes.AddPublishBatchCommand( Routes.AddPublishBatchCommand(
@@ -113,7 +113,7 @@ namespace DotNetCore.CAP.Dashboard
{ {
var msg = client.Storage.GetMonitoringApi().GetReceivedMessageAsync(messageId) var msg = client.Storage.GetMonitoringApi().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult(); .GetAwaiter().GetResult();
msg.Origin = StringSerializer.DeSerialize(msg.Content);
msg.Origin = serializer.Deserialize(msg.Content);
client.RequestServices.GetService<ISubscribeDispatcher>().DispatchAsync(msg); client.RequestServices.GetService<ISubscribeDispatcher>().DispatchAsync(msg);
}); });


@@ -135,9 +135,9 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddRazorPage("/nodes/node/(?<Id>.+)", x => new NodePage(x.UriMatch.Groups["Id"].Value)); Routes.AddRazorPage("/nodes/node/(?<Id>.+)", x => new NodePage(x.UriMatch.Groups["Id"].Value));


#endregion Razor pages and commands #endregion Razor pages and commands
}


public static RouteCollection Routes { get; }
return Routes;
}


internal static string GetContentFolderNamespace(string contentFolder) internal static string GetContentFolderNamespace(string contentFolder)
{ {


+ 7
- 5
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs View File

@@ -19,10 +19,12 @@ namespace DotNetCore.CAP.InMemoryStorage
internal class InMemoryStorage : IDataStorage internal class InMemoryStorage : IDataStorage
{ {
private readonly IOptions<CapOptions> _capOptions; private readonly IOptions<CapOptions> _capOptions;
private readonly ISerializer _serializer;


public InMemoryStorage(IOptions<CapOptions> capOptions)
public InMemoryStorage(IOptions<CapOptions> capOptions, ISerializer serializer)
{ {
_capOptions = capOptions; _capOptions = capOptions;
_serializer = serializer;
} }


public static ConcurrentDictionary<string, MemoryMessage> PublishedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>(); public static ConcurrentDictionary<string, MemoryMessage> PublishedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>();
@@ -49,7 +51,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
@@ -104,7 +106,7 @@ namespace DotNetCore.CAP.InMemoryStorage
Origin = mdMessage.Origin, Origin = mdMessage.Origin,
Group = group, Group = group,
Name = name, Name = name,
Content = StringSerializer.Serialize(mdMessage.Origin),
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries, Retries = mdMessage.Retries,
Added = mdMessage.Added, Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt, ExpiresAt = mdMessage.ExpiresAt,
@@ -152,7 +154,7 @@ namespace DotNetCore.CAP.InMemoryStorage


foreach (var message in ret) foreach (var message in ret)
{ {
message.Origin = StringSerializer.DeSerialize(message.Content);
message.Origin = _serializer.Deserialize(message.Content);
} }


return Task.FromResult(ret); return Task.FromResult(ret);
@@ -169,7 +171,7 @@ namespace DotNetCore.CAP.InMemoryStorage


foreach (var message in ret) foreach (var message in ret)
{ {
message.Origin = StringSerializer.DeSerialize(message.Content);
message.Origin = _serializer.Deserialize(message.Content);
} }


return Task.FromResult(ret); return Task.FromResult(ret);


+ 8
- 6
src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs View File

@@ -11,7 +11,6 @@ using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring; using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization; using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MongoDB.Driver; using MongoDB.Driver;


@@ -23,16 +22,19 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoClient _client; private readonly IMongoClient _client;
private readonly IMongoDatabase _database; private readonly IMongoDatabase _database;
private readonly IOptions<MongoDBOptions> _options; private readonly IOptions<MongoDBOptions> _options;
private readonly ISerializer _serializer;


public MongoDBDataStorage( public MongoDBDataStorage(
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options, IOptions<MongoDBOptions> options,
IMongoClient client)
IMongoClient client,
ISerializer serializer)
{ {
_capOptions = capOptions; _capOptions = capOptions;
_options = options; _options = options;
_client = client; _client = client;
_database = _client.GetDatabase(_options.Value.DatabaseName); _database = _client.GetDatabase(_options.Value.DatabaseName);
_serializer = serializer;
} }


public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
@@ -67,7 +69,7 @@ namespace DotNetCore.CAP.MongoDB
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
@@ -130,7 +132,7 @@ namespace DotNetCore.CAP.MongoDB
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
}; };
var content = StringSerializer.Serialize(mdMessage.Origin);
var content = _serializer.Serialize(mdMessage.Origin);


var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection); var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);


@@ -184,7 +186,7 @@ namespace DotNetCore.CAP.MongoDB
return queryResult.Select(x => new MediumMessage return queryResult.Select(x => new MediumMessage
{ {
DbId = x.Id.ToString(), DbId = x.Id.ToString(),
Origin = StringSerializer.DeSerialize(x.Content),
Origin = _serializer.Deserialize(x.Content),
Retries = x.Retries, Retries = x.Retries,
Added = x.Added Added = x.Added
}).ToList(); }).ToList();
@@ -205,7 +207,7 @@ namespace DotNetCore.CAP.MongoDB
return queryResult.Select(x => new MediumMessage return queryResult.Select(x => new MediumMessage
{ {
DbId = x.Id.ToString(), DbId = x.Id.ToString(),
Origin = StringSerializer.DeSerialize(x.Content),
Origin = _serializer.Deserialize(x.Content),
Retries = x.Retries, Retries = x.Retries,
Added = x.Added Added = x.Added
}).ToList(); }).ToList();


+ 7
- 4
src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs View File

@@ -22,17 +22,20 @@ namespace DotNetCore.CAP.MySql
private readonly IOptions<MySqlOptions> _options; private readonly IOptions<MySqlOptions> _options;
private readonly IOptions<CapOptions> _capOptions; private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer; private readonly IStorageInitializer _initializer;
private readonly ISerializer _serializer;
private readonly string _pubName; private readonly string _pubName;
private readonly string _recName; private readonly string _recName;


public MySqlDataStorage( public MySqlDataStorage(
IOptions<MySqlOptions> options, IOptions<MySqlOptions> options,
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
IStorageInitializer initializer)
IStorageInitializer initializer,
ISerializer serializer)
{ {
_options = options; _options = options;
_capOptions = capOptions; _capOptions = capOptions;
_initializer = initializer; _initializer = initializer;
_serializer = serializer;
_pubName = initializer.GetPublishedTableName(); _pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName(); _recName = initializer.GetReceivedTableName();
} }
@@ -52,7 +55,7 @@ namespace DotNetCore.CAP.MySql
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
@@ -122,7 +125,7 @@ namespace DotNetCore.CAP.MySql
new MySqlParameter("@Id", mdMessage.DbId), new MySqlParameter("@Id", mdMessage.DbId),
new MySqlParameter("@Name", name), new MySqlParameter("@Name", name),
new MySqlParameter("@Group", group), new MySqlParameter("@Group", group),
new MySqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new MySqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)),
new MySqlParameter("@Retries", mdMessage.Retries), new MySqlParameter("@Retries", mdMessage.Retries),
new MySqlParameter("@Added", mdMessage.Added), new MySqlParameter("@Added", mdMessage.Added),
new MySqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), new MySqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
@@ -194,7 +197,7 @@ namespace DotNetCore.CAP.MySql
messages.Add(new MediumMessage messages.Add(new MediumMessage
{ {
DbId = reader.GetInt64(0).ToString(), DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Origin = _serializer.Deserialize(reader.GetString(1)),
Retries = reader.GetInt32(2), Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3) Added = reader.GetDateTime(3)
}); });


+ 7
- 4
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs View File

@@ -22,17 +22,20 @@ namespace DotNetCore.CAP.PostgreSql
private readonly IOptions<CapOptions> _capOptions; private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer; private readonly IStorageInitializer _initializer;
private readonly IOptions<PostgreSqlOptions> _options; private readonly IOptions<PostgreSqlOptions> _options;
private readonly ISerializer _serializer;
private readonly string _pubName; private readonly string _pubName;
private readonly string _recName; private readonly string _recName;


public PostgreSqlDataStorage( public PostgreSqlDataStorage(
IOptions<PostgreSqlOptions> options, IOptions<PostgreSqlOptions> options,
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
IStorageInitializer initializer)
IStorageInitializer initializer,
ISerializer serializer)
{ {
_capOptions = capOptions; _capOptions = capOptions;
_initializer = initializer; _initializer = initializer;
_options = options; _options = options;
_serializer = serializer;
_pubName = initializer.GetPublishedTableName(); _pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName(); _recName = initializer.GetReceivedTableName();
} }
@@ -53,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
@@ -121,7 +124,7 @@ namespace DotNetCore.CAP.PostgreSql
new NpgsqlParameter("@Id", long.Parse(mdMessage.DbId)), new NpgsqlParameter("@Id", long.Parse(mdMessage.DbId)),
new NpgsqlParameter("@Name", name), new NpgsqlParameter("@Name", name),
new NpgsqlParameter("@Group", group), new NpgsqlParameter("@Group", group),
new NpgsqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new NpgsqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)),
new NpgsqlParameter("@Retries", mdMessage.Retries), new NpgsqlParameter("@Retries", mdMessage.Retries),
new NpgsqlParameter("@Added", mdMessage.Added), new NpgsqlParameter("@Added", mdMessage.Added),
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
@@ -199,7 +202,7 @@ namespace DotNetCore.CAP.PostgreSql
messages.Add(new MediumMessage messages.Add(new MediumMessage
{ {
DbId = reader.GetInt64(0).ToString(), DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Origin = _serializer.Deserialize(reader.GetString(1)),
Retries = reader.GetInt32(2), Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3) Added = reader.GetDateTime(3)
}); });


+ 7
- 4
src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs View File

@@ -22,17 +22,20 @@ namespace DotNetCore.CAP.SqlServer
private readonly IOptions<CapOptions> _capOptions; private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<SqlServerOptions> _options; private readonly IOptions<SqlServerOptions> _options;
private readonly IStorageInitializer _initializer; private readonly IStorageInitializer _initializer;
private readonly ISerializer _serializer;
private readonly string _pubName; private readonly string _pubName;
private readonly string _recName; private readonly string _recName;


public SqlServerDataStorage( public SqlServerDataStorage(
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
IOptions<SqlServerOptions> options, IOptions<SqlServerOptions> options,
IStorageInitializer initializer)
IStorageInitializer initializer,
ISerializer serializer)
{ {
_options = options; _options = options;
_initializer = initializer; _initializer = initializer;
_capOptions = capOptions; _capOptions = capOptions;
_serializer = serializer;
_pubName = initializer.GetPublishedTableName(); _pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName(); _recName = initializer.GetReceivedTableName();
} }
@@ -52,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer
{ {
DbId = content.GetId(), DbId = content.GetId(),
Origin = content, Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now, Added = DateTime.Now,
ExpiresAt = null, ExpiresAt = null,
Retries = 0 Retries = 0
@@ -120,7 +123,7 @@ namespace DotNetCore.CAP.SqlServer
new SqlParameter("@Id", mdMessage.DbId), new SqlParameter("@Id", mdMessage.DbId),
new SqlParameter("@Name", name), new SqlParameter("@Name", name),
new SqlParameter("@Group", group), new SqlParameter("@Group", group),
new SqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new SqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)),
new SqlParameter("@Retries", mdMessage.Retries), new SqlParameter("@Retries", mdMessage.Retries),
new SqlParameter("@Added", mdMessage.Added), new SqlParameter("@Added", mdMessage.Added),
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
@@ -200,7 +203,7 @@ namespace DotNetCore.CAP.SqlServer
messages.Add(new MediumMessage messages.Add(new MediumMessage
{ {
DbId = reader.GetInt64(0).ToString(), DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Origin = _serializer.Deserialize(reader.GetString(1)),
Retries = reader.GetInt32(2), Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3) Added = reader.GetDateTime(3)
}); });


+ 1
- 1
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -195,7 +195,7 @@ namespace DotNetCore.CAP.Internal


if (message.HasException()) if (message.HasException())
{ {
var content = StringSerializer.Serialize(message);
var content = _serializer.Serialize(message);


_storage.StoreReceivedExceptionMessage(name, group, content); _storage.StoreReceivedExceptionMessage(name, group, content);




+ 6
- 4
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs View File

@@ -8,10 +8,10 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal; using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;


namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
@@ -19,11 +19,13 @@ namespace DotNetCore.CAP.Internal
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly ISerializer _serializer;
private readonly ConcurrentDictionary<int, ObjectMethodExecutor> _executors; private readonly ConcurrentDictionary<int, ObjectMethodExecutor> _executors;


public SubscribeInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
public SubscribeInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider, ISerializer serializer)
{ {
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_serializer = serializer;
_logger = loggerFactory.CreateLogger<SubscribeInvoker>(); _logger = loggerFactory.CreateLogger<SubscribeInvoker>();
_executors = new ConcurrentDictionary<int, ObjectMethodExecutor>(); _executors = new ConcurrentDictionary<int, ObjectMethodExecutor>();
} }
@@ -57,9 +59,9 @@ namespace DotNetCore.CAP.Internal
{ {
if (message.Value != null) if (message.Value != null)
{ {
if (message.Value is JToken jToken) //reading from storage
if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
{ {
executeParameters[i] = jToken.ToObject(parameterDescriptors[i].ParameterType);
executeParameters[i] = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType);
} }
else else
{ {


+ 8
- 2
src/DotNetCore.CAP/Messages/Message.cs View File

@@ -9,16 +9,22 @@ namespace DotNetCore.CAP.Messages
{ {
public class Message public class Message
{ {
/// <summary>
/// System.Text.Json requires that class explicitly has a parameterless constructor
/// and public properties have a setter.
/// </summary>
public Message() {}

public Message(IDictionary<string, string> headers, [CanBeNull] object value) public Message(IDictionary<string, string> headers, [CanBeNull] object value)
{ {
Headers = headers ?? throw new ArgumentNullException(nameof(headers)); Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Value = value; Value = value;
} }


public IDictionary<string, string> Headers { get; }
public IDictionary<string, string> Headers { get; set; }


[CanBeNull] [CanBeNull]
public object Value { get; }
public object Value { get; set; }
} }


public static class MessageExtensions public static class MessageExtensions


+ 27
- 2
src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs View File

@@ -6,6 +6,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


namespace DotNetCore.CAP.Serialization namespace DotNetCore.CAP.Serialization
{ {
@@ -37,5 +38,29 @@ namespace DotNetCore.CAP.Serialization
var json = Encoding.UTF8.GetString(transportMessage.Body); var json = Encoding.UTF8.GetString(transportMessage.Body);
return Task.FromResult(new Message(transportMessage.Headers, JsonConvert.DeserializeObject(json, valueType))); return Task.FromResult(new Message(transportMessage.Headers, JsonConvert.DeserializeObject(json, valueType)));
} }
}
}

public string Serialize(Message message)
{
return JsonConvert.SerializeObject(message);
}

public Message Deserialize(string json)
{
return JsonConvert.DeserializeObject<Message>(json);
}

public object Deserialize(object value, Type valueType)
{
if (value is JToken jToken)
{
return jToken.ToObject(valueType);
}
throw new NotSupportedException("Type is not of type JToken");
}

public bool IsJsonType(object jsonObject)
{
return jsonObject is JToken;
}
}
}

+ 31
- 1
src/DotNetCore.CAP/Serialization/ISerializer.cs View File

@@ -16,8 +16,38 @@ namespace DotNetCore.CAP.Serialization
Task<TransportMessage> SerializeAsync(Message message); Task<TransportMessage> SerializeAsync(Message message);


/// <summary> /// <summary>
/// Deserializes the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// Deserialize the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// </summary> /// </summary>
Task<Message> DeserializeAsync(TransportMessage transportMessage, [CanBeNull] Type valueType); Task<Message> DeserializeAsync(TransportMessage transportMessage, [CanBeNull] Type valueType);
/// <summary>
/// Serializes the given <see cref="Message"/> into a string
/// </summary>
string Serialize(Message message);

/// <summary>
/// Deserialize the given string into a <see cref="Message"/>
/// </summary>
Message Deserialize(string json);

/// <summary>
/// Deserialize the given object with the given Type into an object
/// </summary>
object Deserialize(object value, Type valueType);

/// <summary>
/// Check if the given object is of Json type, e.g. JToken or JsonElement
/// depending on the type of serializer implemented
/// </summary>
/// <example>
/// <code>
/// // Example implementation for System.Text.Json
/// public bool IsJsonType(object jsonObject)
/// {
/// return jsonObject is JsonElement;
/// }
/// </code>
/// </example>
bool IsJsonType(object jsonObject);
} }
} }

+ 3
- 1
test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs View File

@@ -3,6 +3,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Xunit; using Xunit;


@@ -15,10 +16,11 @@ namespace DotNetCore.CAP.MySql.Test


public MySqlStorageConnectionTest() public MySqlStorageConnectionTest()
{ {
var serializer = GetService<ISerializer>();
var options = GetService<IOptions<MySqlOptions>>(); var options = GetService<IOptions<MySqlOptions>>();
var capOptions = GetService<IOptions<CapOptions>>(); var capOptions = GetService<IOptions<CapOptions>>();
var initializer = GetService<IStorageInitializer>(); var initializer = GetService<IStorageInitializer>();
_storage = new MySqlDataStorage(options, capOptions, initializer);
_storage = new MySqlDataStorage(options, capOptions, initializer, serializer);
} }


[Fact] [Fact]


+ 3
- 1
test/DotNetCore.CAP.MySql.Test/TestHost.cs View File

@@ -1,5 +1,6 @@
using System; using System;
using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;


namespace DotNetCore.CAP.MySql.Test namespace DotNetCore.CAP.MySql.Test
@@ -34,8 +35,9 @@ namespace DotNetCore.CAP.MySql.Test
{ {
x.ConnectionString = ConnectionString; x.ConnectionString = ConnectionString;
}); });
services.AddSingleton<MySqlDataStorage>();
services.AddSingleton<MySqlDataStorage>();
services.AddSingleton<IStorageInitializer,MySqlStorageInitializer>(); services.AddSingleton<IStorageInitializer,MySqlStorageInitializer>();
services.AddSingleton<ISerializer, JsonUtf8Serializer>();
_services = services; _services = services;
} }




+ 62
- 0
test/DotNetCore.CAP.Test/MessageTest.cs View File

@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
namespace DotNetCore.CAP.Test
{
public class MessageTest
{
private readonly IServiceProvider _provider;
public MessageTest()
{
var services = new ServiceCollection();
ServiceCollectionExtensions.ServiceCollection = services;
services.AddSingleton<ISerializer, JsonUtf8Serializer>();
_provider = services.BuildServiceProvider();
}
[Fact]
public void Serialize_then_Deserialize_Message_With_Utf8JsonSerializer()
{
// Given
var givenMessage = new Message(
headers: new Dictionary<string, string>() {
{ "cap-msg-name", "authentication.users.update"},
{ "cap-msg-type", "User" },
{ "cap-corr-seq", "0"},
{ "cap-msg-group","service.v1"}
},
value: new MessageValue("test@test.com", "User"));
// When
var serializer = _provider.GetRequiredService<ISerializer>();
var json = serializer.Serialize(givenMessage);
var deserializedMessage = serializer.Deserialize(json);
// Then
Assert.True(serializer.IsJsonType(deserializedMessage.Value));
var result = serializer.Deserialize(deserializedMessage.Value, typeof(MessageValue)) as MessageValue;
Assert.NotNull(result);
Assert.Equal(result.Email, ((MessageValue)givenMessage.Value).Email);
Assert.Equal(result.Name, ((MessageValue)givenMessage.Value).Name);
}
}
public class MessageValue
{
public MessageValue(string email, string name)
{
Email = email;
Name = name;
}
public string Email { get; }
public string Name { get; }
}
}

+ 2
- 0
test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs View File

@@ -4,6 +4,7 @@ using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Xunit; using Xunit;


@@ -17,6 +18,7 @@ namespace DotNetCore.CAP.Test
{ {
var serviceCollection = new ServiceCollection(); var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging(); serviceCollection.AddLogging();
serviceCollection.AddSingleton<ISerializer, JsonUtf8Serializer>();
serviceCollection.AddSingleton<ISubscribeInvoker, SubscribeInvoker>(); serviceCollection.AddSingleton<ISubscribeInvoker, SubscribeInvoker>();
_serviceProvider = serviceCollection.BuildServiceProvider(); _serviceProvider = serviceCollection.BuildServiceProvider();
} }


Loading…
Cancel
Save