Browse Source

replace dapper with ado.net (#583)

* replace dapper with ado.net

* replace dapper with ado.net

* replace dapper with ado.net

* modify nuget reference

Co-authored-by: wandone\xlw <123456>
master
xiangxiren 4 years ago
committed by GitHub
parent
commit
37a4d6a0fe
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 984 additions and 516 deletions
  1. +4
    -1
      samples/Sample.Kafka.PostgreSql/Sample.Kafka.PostgreSql.csproj
  2. +1
    -0
      samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
  3. +1
    -0
      samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj
  4. +1
    -2
      src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
  5. +101
    -100
      src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
  6. +95
    -0
      src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs
  7. +126
    -65
      src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
  8. +3
    -4
      src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs
  9. +0
    -1
      src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
  10. +98
    -100
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  11. +95
    -0
      src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs
  12. +129
    -67
      src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
  13. +3
    -4
      src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs
  14. +0
    -1
      src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
  15. +101
    -102
      src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
  16. +95
    -0
      src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs
  17. +127
    -65
      src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs
  18. +3
    -4
      src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs
  19. +1
    -0
      test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj

+ 4
- 1
samples/Sample.Kafka.PostgreSql/Sample.Kafka.PostgreSql.csproj View File

@@ -5,7 +5,10 @@
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />


+ 1
- 0
samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj View File

@@ -5,6 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="3.0.0-rc1.final" />
</ItemGroup>
<ItemGroup>


+ 1
- 0
samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj View File

@@ -5,6 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="3.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>


+ 1
- 2
src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj View File

@@ -12,10 +12,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.0" />
<PackageReference Include="MySqlConnector" Version="0.60.3" />
<PackageReference Include="MySqlConnector" Version="0.69.0" />
</ItemGroup>

<ItemGroup>


+ 101
- 100
src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs View File

@@ -6,7 +6,6 @@ using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -23,6 +22,8 @@ namespace DotNetCore.CAP.MySql
private readonly IOptions<MySqlOptions> _options;
private readonly IOptions<CapOptions> _capOptions;
private readonly IStorageInitializer _initializer;
private readonly string _pubName;
private readonly string _recName;

public MySqlDataStorage(
IOptions<MySqlOptions> options,
@@ -32,41 +33,20 @@ namespace DotNetCore.CAP.MySql
_options = options;
_capOptions = capOptions;
_initializer = initializer;
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}

public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

var sql = $"UPDATE `{_initializer.GetPublishedTableName()}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}

public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_pubName, message, state);

var sql = $"UPDATE `{_initializer.GetReceivedTableName()}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_recName, message, state);

public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
var sql = $"INSERT INTO `{_initializer.GetPublishedTableName()}`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql = $"INSERT INTO `{_pubName}`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)" +
$" VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

var message = new MediumMessage
{
@@ -78,21 +58,21 @@ namespace DotNetCore.CAP.MySql
Retries = 0
};

var po = new
object[] sqlParams =
{
Id = message.DbId,
Name = name,
message.Content,
message.Retries,
message.Added,
message.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
new MySqlParameter("@Id", message.DbId),
new MySqlParameter("@Name", name),
new MySqlParameter("@Content", message.Content),
new MySqlParameter("@Retries", message.Retries),
new MySqlParameter("@Added", message.Added),
new MySqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value),
new MySqlParameter("@StatusName", nameof(StatusName.Scheduled)),
};

if (dbTransaction == null)
{
using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}
else
{
@@ -103,7 +83,7 @@ namespace DotNetCore.CAP.MySql
}

var conn = dbTrans?.Connection;
conn.Execute(sql, po, dbTrans);
conn.ExecuteNonQuery(sql, dbTrans, sqlParams);
}

return message;
@@ -111,26 +91,23 @@ namespace DotNetCore.CAP.MySql

public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
object[] sqlParams =
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = @group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
new MySqlParameter("@Id", SnowflakeId.Default().NextId().ToString()),
new MySqlParameter("@Name", name),
new MySqlParameter("@Group", group),
new MySqlParameter("@Content", content),
new MySqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new MySqlParameter("@Added", DateTime.Now),
new MySqlParameter("@ExpiresAt", DateTime.Now.AddDays(15)),
new MySqlParameter("@StatusName", nameof(StatusName.Failed))
};

StoreReceivedMessage(sqlParams);
}

public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql = $@"INSERT INTO `{_initializer.GetReceivedTableName()}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
@@ -139,79 +116,103 @@ namespace DotNetCore.CAP.MySql
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new

object[] sqlParams =
{
Id = mdMessage.DbId,
Group = @group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
new MySqlParameter("@Id", mdMessage.DbId),
new MySqlParameter("@Name", name),
new MySqlParameter("@Group", group),
new MySqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new MySqlParameter("@Retries", mdMessage.Retries),
new MySqlParameter("@Added", mdMessage.Added),
new MySqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
new MySqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

StoreReceivedMessage(sqlParams);
return mdMessage;
}

public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

return await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;",
new { timeout, batchCount });
return connection.ExecuteNonQuery(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;", null,
new MySqlParameter("@timeout", timeout), new MySqlParameter("@batchCount", batchCount));
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT * FROM `{_initializer.GetPublishedTableName()}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var sql = $"SELECT * FROM `{_pubName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_initializer.GetReceivedTableName()}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
$"SELECT * FROM `{_recName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

return await GetMessagesOfNeedRetryAsync(sql);
}

public IMonitoringApi GetMonitoringApi()
{
return new MySqlMonitoringApi(_options, _initializer);
}

private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE `{tableName}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

var result = new List<MediumMessage>();
object[] sqlParams =
{
new MySqlParameter("@Id", message.DbId),
new MySqlParameter("@Retries", message.Retries),
new MySqlParameter("@ExpiresAt", message.ExpiresAt),
new MySqlParameter("@StatusName", state.ToString("G"))
};

await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private void StoreReceivedMessage(object[] sqlParams)
{
var sql = $@"INSERT INTO `{_recName}`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) " +
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using var connection = new MySqlConnection(_options.Value.ConnectionString);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
{
List<MediumMessage> result;
await using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
result.Add(new MediumMessage
result = connection.ExecuteReader(sql, reader =>
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(4)),
Retries = reader.GetInt32(5),
Added = reader.GetDateTime(6)
var messages = new List<MediumMessage>();
while (reader.Read())
{
messages.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3)
});
}

return messages;
});
}

return result;
}

public IMonitoringApi GetMonitoringApi()
{
return new MySqlMonitoringApi(_options, _initializer);
}
}
}

+ 95
- 0
src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs View File

@@ -0,0 +1,95 @@
using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.MySql
{
internal static class IDbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

if (transaction != null)
{
command.Transaction = transaction;
}

return command.ExecuteNonQuery();
}

public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var reader = command.ExecuteReader();

T result = default;
if (readerFunc != null)
{
result = readerFunc(reader);
}

return result;
}

public static T ExecuteScalar<T>(this IDbConnection connection, string sql, params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var objValue = command.ExecuteScalar();

T result = default;
if (objValue != null)
{
var returnType = typeof(T);
var converter = TypeDescriptor.GetConverter(returnType);
if (converter.CanConvertFrom(objValue.GetType()))
{
result = (T)converter.ConvertFrom(objValue);
}
else
{
result = (T)Convert.ChangeType(objValue, returnType);
}
}

return result;
}
}
}

+ 126
- 65
src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs View File

@@ -3,10 +3,8 @@

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -18,13 +16,13 @@ namespace DotNetCore.CAP.MySql
{
internal class MySqlMonitoringApi : IMonitoringApi
{
private readonly IOptions<MySqlOptions> _options;
private readonly MySqlOptions _options;
private readonly string _pubName;
private readonly string _recName;

public MySqlMonitoringApi(IOptions<MySqlOptions> options, IStorageInitializer initializer)
{
_options = options;
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}
@@ -32,41 +30,53 @@ namespace DotNetCore.CAP.MySql
public StatisticsDto GetStatistics()
{
var sql = $@"
set transaction isolation level read committed;
select count(Id) from `{_pubName}` where StatusName = N'Succeeded';
select count(Id) from `{_recName}` where StatusName = N'Succeeded';
select count(Id) from `{_pubName}` where StatusName = N'Failed';
select count(Id) from `{_recName}` where StatusName = N'Failed';";

var statistics = UseConnection(connection =>
set transaction isolation level read committed;
SELECT
(
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Succeeded'
) AS PublishedSucceeded,
(
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Succeeded'
) AS ReceivedSucceeded,
(
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Failed'
) AS PublishedFailed,
(
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Failed'
) AS ReceivedFailed;";

StatisticsDto statistics;
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
statistics = connection.ExecuteReader(sql, reader =>
{
stats.PublishedSucceeded = multi.ReadSingle<int>();
stats.ReceivedSucceeded = multi.ReadSingle<int>();

stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
}
var statisticsDto = new StatisticsDto();

while (reader.Read())
{
statisticsDto.PublishedSucceeded = reader.GetInt32(0);
statisticsDto.ReceivedSucceeded = reader.GetInt32(1);
statisticsDto.PublishedFailed = reader.GetInt32(2);
statisticsDto.ReceivedFailed = reader.GetInt32(3);
}

return statisticsDto;
});
}

return stats;
});
return statistics;
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Failed));
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Succeeded));
}

public IList<MessageDto> Messages(MessageQueryDto queryDto)
@@ -96,52 +106,70 @@ select count(Id) from `{_recName}` where StatusName = N'Failed';";
var sqlQuery =
$"select * from `{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";

return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
object[] sqlParams =
{
new MySqlParameter("@StatusName", queryDto.StatusName ?? string.Empty),
new MySqlParameter("@Group", queryDto.Group ?? string.Empty),
new MySqlParameter("@Name", queryDto.Name ?? string.Empty),
new MySqlParameter("@Content", $"%{queryDto.Content}%"),
new MySqlParameter("@Offset", queryDto.CurrentPage * queryDto.PageSize),
new MySqlParameter("@Limit", queryDto.PageSize)
};

using var connection = new MySqlConnection(_options.ConnectionString);
return connection.ExecuteReader(sqlQuery, reader =>
{
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
var messages = new List<MessageDto>();

while (reader.Read())
{
var index = 0;
messages.Add(new MessageDto
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
ExpiresAt = reader.GetDateTime(index++),
StatusName = reader.GetString(index)
});
}

return messages;
}, sqlParams);
}

public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Failed));
}

public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Succeeded));
}

public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_recName, nameof(StatusName.Failed));
}

public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded));
}

private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
private int GetNumberOfMessage(string tableName, string statusName)
{
var sqlQuery = $"select count(Id) from `{tableName}` where StatusName = @state";

var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}

private T UseConnection<T>(Func<IDbConnection, T> action)
{
return action(new MySqlConnection(_options.Value.ConnectionString));
using var connection = new MySqlConnection(_options.ConnectionString);
return connection.ExecuteScalar<int>(sqlQuery, new MySqlParameter("@state", statusName));
}

private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
private Dictionary<DateTime, int> GetHourlyTimelineStats(string tableName, string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
@@ -153,11 +181,10 @@ select count(Id) from `{_recName}` where StatusName = N'Failed';";

var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);

return GetTimelineStats(connection, tableName, statusName, keyMaps);
return GetTimelineStats(tableName, statusName, keyMaps);
}

private Dictionary<DateTime, int> GetTimelineStats(
IDbConnection connection,
string tableName,
string statusName,
IDictionary<string, DateTime> keyMaps)
@@ -170,12 +197,30 @@ select aggr.* from (
from `{tableName}`
where StatusName = @statusName
group by date_format(`Added`,'%Y-%m-%d-%H')
) aggr where `Key` in @keys;";
) aggr where `Key` >= @minKey and `Key` <= @maxKey;";

var valuesMap = connection.Query<TimelineCounter>(
sqlQuery,
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
object[] sqlParams =
{
new MySqlParameter("@statusName", statusName),
new MySqlParameter("@minKey", keyMaps.Keys.Min()),
new MySqlParameter("@maxKey", keyMaps.Keys.Max())
};

Dictionary<string, int> valuesMap;
using (var connection = new MySqlConnection(_options.ConnectionString))
{
valuesMap = connection.ExecuteReader(sqlQuery, reader =>
{
var dictionary = new Dictionary<string, int>();

while (reader.Read())
{
dictionary.Add(reader.GetString(0), reader.GetInt32(1));
}

return dictionary;
}, sqlParams);
}

foreach (var key in keyMaps.Keys)
{
@@ -195,19 +240,35 @@ select aggr.* from (
return result;
}

public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{_pubName}` WHERE `Id`={id};";
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id);

await using var connection = new MySqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id);

public async Task<MediumMessage> GetReceivedMessageAsync(long id)
private async Task<MediumMessage> GetMessageAsync(string tableName, long id)
{
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{_recName}` WHERE Id={id};";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};";

await using var connection = new MySqlConnection(_options.ConnectionString);
var mediumMessae = connection.ExecuteReader(sql, reader =>
{
MediumMessage message = null;

while (reader.Read())
{
message = new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Content = reader.GetString(1),
Added = reader.GetDateTime(2),
ExpiresAt = reader.GetDateTime(3),
Retries = reader.GetInt32(4)
};
}

return message;
});

return mediumMessae;
}
}



+ 3
- 4
src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs View File

@@ -3,7 +3,6 @@

using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -43,9 +42,9 @@ namespace DotNetCore.CAP.MySql

var sql = CreateDbTablesScript();
await using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
connection.ExecuteNonQuery(sql);
await Task.CompletedTask;

_logger.LogDebug("Ensuring all create database tables script are applied.");
}


+ 0
- 1
src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj View File

@@ -12,7 +12,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.0" />
<PackageReference Include="Npgsql" Version="4.1.1" />


+ 98
- 100
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs View File

@@ -6,7 +6,6 @@ using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -38,33 +37,11 @@ namespace DotNetCore.CAP.PostgreSql
_recName = initializer.GetReceivedTableName();
}

public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {_pubName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = long.Parse(message.DbId),
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_pubName, message, state);

public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {_recName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = long.Parse(message.DbId),
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_recName, message, state);

public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
@@ -82,21 +59,21 @@ namespace DotNetCore.CAP.PostgreSql
Retries = 0
};

var po = new
object[] sqlParams =
{
Id = long.Parse(message.DbId),
Name = name,
message.Content,
message.Retries,
message.Added,
message.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
new NpgsqlParameter("@Id", long.Parse(message.DbId)),
new NpgsqlParameter("@Name", name),
new NpgsqlParameter("@Content", message.Content),
new NpgsqlParameter("@Retries", message.Retries),
new NpgsqlParameter("@Added", message.Added),
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

if (dbTransaction == null)
{
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}
else
{
@@ -105,7 +82,7 @@ namespace DotNetCore.CAP.PostgreSql
dbTrans = dbContextTrans.GetDbTransaction();

var conn = dbTrans?.Connection;
conn.Execute(sql, po, dbTrans);
conn.ExecuteNonQuery(sql, dbTrans, sqlParams);
}

return message;
@@ -113,30 +90,23 @@ namespace DotNetCore.CAP.PostgreSql

public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql =
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
object[] sqlParams =
{
Id = SnowflakeId.Default().NextId(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
new NpgsqlParameter("@Id", SnowflakeId.Default().NextId()),
new NpgsqlParameter("@Name", name),
new NpgsqlParameter("@Group", group),
new NpgsqlParameter("@Content", content),
new NpgsqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new NpgsqlParameter("@Added", DateTime.Now),
new NpgsqlParameter("@ExpiresAt", DateTime.Now.AddDays(15)),
new NpgsqlParameter("@StatusName", nameof(StatusName.Failed))
};

StoreReceivedMessage(sqlParams);
}

public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql =
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
@@ -145,19 +115,20 @@ namespace DotNetCore.CAP.PostgreSql
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new

object[] sqlParams =
{
Id = long.Parse(mdMessage.DbId),
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
new NpgsqlParameter("@Id", long.Parse(mdMessage.DbId)),
new NpgsqlParameter("@Name", name),
new NpgsqlParameter("@Group", group),
new NpgsqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new NpgsqlParameter("@Retries", mdMessage.Retries),
new NpgsqlParameter("@Added", mdMessage.Added),
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

StoreReceivedMessage(sqlParams);
return mdMessage;
}

@@ -165,10 +136,11 @@ namespace DotNetCore.CAP.PostgreSql
CancellationToken token = default)
{
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var count = connection.ExecuteNonQuery(
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", null,
new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount));

return await connection.ExecuteAsync(
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);",
new { timeout, batchCount });
return await Task.FromResult(count);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
@@ -177,21 +149,7 @@ namespace DotNetCore.CAP.PostgreSql
var sql =
$"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}

return result;
return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
@@ -200,27 +158,67 @@ namespace DotNetCore.CAP.PostgreSql
var sql =
$"SELECT * FROM {_recName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
return await GetMessagesOfNeedRetryAsync(sql);
}

public IMonitoringApi GetMonitoringApi()
{
return new PostgreSqlMonitoringApi(_options, _initializer);
}

private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";

object[] sqlParams =
{
new NpgsqlParameter("@Id", long.Parse(message.DbId)),
new NpgsqlParameter("@Retries", message.Retries),
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt),
new NpgsqlParameter("@StatusName", state.ToString("G"))
};

using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);

await Task.CompletedTask;
}

private void StoreReceivedMessage(object[] sqlParams)
{
var sql =
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
{
List<MediumMessage> result;
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
result.Add(new MediumMessage
result = connection.ExecuteReader(sql, reader =>
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(4)),
Retries = reader.GetInt32(5),
Added = reader.GetDateTime(6)
var messages = new List<MediumMessage>();
while (reader.Read())
{
messages.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3)
});
}

return messages;
});
}

return result;
}

public IMonitoringApi GetMonitoringApi()
{
return new PostgreSqlMonitoringApi(_options, _initializer);
return await Task.FromResult(result);
}
}
}

+ 95
- 0
src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs View File

@@ -0,0 +1,95 @@
using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.PostgreSql
{
internal static class IDbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

if (transaction != null)
{
command.Transaction = transaction;
}

return command.ExecuteNonQuery();
}

public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var reader = command.ExecuteReader();

T result = default;
if (readerFunc != null)
{
result = readerFunc(reader);
}

return result;
}

public static T ExecuteScalar<T>(this IDbConnection connection, string sql, params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var objValue = command.ExecuteScalar();

T result = default;
if (objValue != null)
{
var returnType = typeof(T);
var converter = TypeDescriptor.GetConverter(returnType);
if (converter.CanConvertFrom(objValue.GetType()))
{
result = (T)converter.ConvertFrom(objValue);
}
else
{
result = (T)Convert.ChangeType(objValue, returnType);
}
}

return result;
}
}
}

+ 129
- 67
src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs View File

@@ -3,10 +3,8 @@

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -18,56 +16,57 @@ namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlMonitoringApi : IMonitoringApi
{
private readonly IOptions<PostgreSqlOptions> _options;
private readonly PostgreSqlOptions _options;
private readonly string _pubName;
private readonly string _recName;

public PostgreSqlMonitoringApi(IOptions<PostgreSqlOptions> options,IStorageInitializer initializer)
public PostgreSqlMonitoringApi(IOptions<PostgreSqlOptions> options, IStorageInitializer initializer)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_pubName = initializer.GetPublishedTableName();
_recName = initializer.GetReceivedTableName();
}

public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql =
$"SELECT \"Id\" AS \"DbId\",* FROM {_pubName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";

using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id);

public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql =
$"SELECT \"Id\" AS \"DbId\",* FROM {_recName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id);

public StatisticsDto GetStatistics()
{
var sql = $@"
select count(""Id"") from {_pubName} where ""StatusName"" = N'Succeeded';
select count(""Id"") from {_recName} where ""StatusName"" = N'Succeeded';
select count(""Id"") from {_pubName} where ""StatusName"" = N'Failed';
select count(""Id"") from {_recName} where ""StatusName"" = N'Failed';";

var statistics = UseConnection(connection =>
SELECT
(
SELECT COUNT(""Id"") FROM {_pubName} WHERE ""StatusName"" = N'Succeeded'
) AS ""PublishedSucceeded"",
(
SELECT COUNT(""Id"") FROM {_recName} WHERE ""StatusName"" = N'Succeeded'
) AS ""ReceivedSucceeded"",
(
SELECT COUNT(""Id"") FROM {_pubName} WHERE ""StatusName"" = N'Failed'
) AS ""PublishedFailed"",
(
SELECT COUNT(""Id"") FROM {_recName} WHERE ""StatusName"" = N'Failed'
) AS ""ReceivedFailed"";";

StatisticsDto statistics;
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
statistics = connection.ExecuteReader(sql, reader =>
{
stats.PublishedSucceeded = multi.ReadSingle<int>();
stats.ReceivedSucceeded = multi.ReadSingle<int>();

stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
}
var statisticsDto = new StatisticsDto();

while (reader.Read())
{
statisticsDto.PublishedSucceeded = reader.GetInt32(0);
statisticsDto.ReceivedSucceeded = reader.GetInt32(1);
statisticsDto.PublishedFailed = reader.GetInt32(2);
statisticsDto.ReceivedFailed = reader.GetInt32(3);
}

return statisticsDto;
});
}

return stats;
});
return statistics;
}

@@ -87,67 +86,85 @@ select count(""Id"") from {_recName} where ""StatusName"" = N'Failed';";
var sqlQuery =
$"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";

return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
object[] sqlParams =
{
new NpgsqlParameter("@StatusName", queryDto.StatusName ?? string.Empty),
new NpgsqlParameter("@Group", queryDto.Group ?? string.Empty),
new NpgsqlParameter("@Name", queryDto.Name ?? string.Empty),
new NpgsqlParameter("@Content", $"%{queryDto.Content}%"),
new NpgsqlParameter("@Offset", queryDto.CurrentPage * queryDto.PageSize),
new NpgsqlParameter("@Limit", queryDto.PageSize)
};

using var connection = new NpgsqlConnection(_options.ConnectionString);
return connection.ExecuteReader(sqlQuery, reader =>
{
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
var messages = new List<MessageDto>();

while (reader.Read())
{
var index = 0;
messages.Add(new MessageDto
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
ExpiresAt = reader.GetDateTime(index++),
StatusName = reader.GetString(index)
});
}

return messages;
}, sqlParams);
}

public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Failed));
}

public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Succeeded));
}

public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_recName, nameof(StatusName.Failed));
}

public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded));
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Succeeded));
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Failed));
}

private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
private int GetNumberOfMessage(string tableName, string statusName)
{
var sqlQuery =
$"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)";

var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
using var connection = new NpgsqlConnection(_options.ConnectionString);
var count = connection.ExecuteScalar<int>(sqlQuery, new NpgsqlParameter("@state", statusName));
return count;
}

private T UseConnection<T>(Func<IDbConnection, T> action)
{
return action(new NpgsqlConnection(_options.Value.ConnectionString));
}

private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
private Dictionary<DateTime, int> GetHourlyTimelineStats(string tableName, string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
@@ -159,11 +176,10 @@ select count(""Id"") from {_recName} where ""StatusName"" = N'Failed';";

var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);

return GetTimelineStats(connection, tableName, statusName, keyMaps);
return GetTimelineStats(tableName, statusName, keyMaps);
}

private Dictionary<DateTime, int> GetTimelineStats(
IDbConnection connection,
string tableName,
string statusName,
IDictionary<string, DateTime> keyMaps)
@@ -177,11 +193,30 @@ with aggr as (
where ""StatusName"" = @statusName
group by to_char(""Added"", 'yyyy-MM-dd-HH')
)
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxKey;";

var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName })
.ToList()
.ToDictionary(x => x.Key, x => x.Count);
object[] sqlParams =
{
new NpgsqlParameter("@statusName", statusName),
new NpgsqlParameter("@minKey", keyMaps.Keys.Min()),
new NpgsqlParameter("@maxKey", keyMaps.Keys.Max())
};

Dictionary<string, int> valuesMap;
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
valuesMap = connection.ExecuteReader(sqlQuery, reader =>
{
var dictionary = new Dictionary<string, int>();

while (reader.Read())
{
dictionary.Add(reader.GetString(0), reader.GetInt32(1));
}

return dictionary;
}, sqlParams);
}

foreach (var key in keyMaps.Keys)
{
@@ -198,6 +233,33 @@ select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";

return result;
}

private async Task<MediumMessage> GetMessageAsync(string tableName, long id)
{
var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED";

using var connection = new NpgsqlConnection(_options.ConnectionString);
var mediumMessae = connection.ExecuteReader(sql, reader =>
{
MediumMessage message = null;

while (reader.Read())
{
message = new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Content = reader.GetString(1),
Added = reader.GetDateTime(2),
ExpiresAt = reader.GetDateTime(3),
Retries = reader.GetInt32(4)
};
}

return message;
});

return await Task.FromResult(mediumMessae);
}
}

internal class TimelineCounter


+ 3
- 4
src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs View File

@@ -3,7 +3,6 @@

using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -40,9 +39,9 @@ namespace DotNetCore.CAP.PostgreSql

var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
connection.ExecuteNonQuery(sql);
await Task.CompletedTask;

_logger.LogDebug("Ensuring all create database tables script are applied.");
}


+ 0
- 1
src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj View File

@@ -13,7 +13,6 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="1.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.0" />


+ 101
- 102
src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs View File

@@ -6,7 +6,6 @@ using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -38,33 +37,11 @@ namespace DotNetCore.CAP.SqlServer
_recName = initializer.GetReceivedTableName();
}

public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {_pubName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_pubName, message, state);

public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {_recName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
using var connection = new SqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_recName, message, state);

public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
{
@@ -81,21 +58,21 @@ namespace DotNetCore.CAP.SqlServer
Retries = 0
};

var po = new
object[] sqlParams =
{
Id = message.DbId,
Name = name,
message.Content,
message.Retries,
message.Added,
message.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
new SqlParameter("@Id", message.DbId),
new SqlParameter("@Name", name),
new SqlParameter("@Content", message.Content),
new SqlParameter("@Retries", message.Retries),
new SqlParameter("@Added", message.Added),
new SqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value),
new SqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

if (dbTransaction == null)
{
using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, po);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}
else
{
@@ -104,7 +81,7 @@ namespace DotNetCore.CAP.SqlServer
dbTrans = dbContextTrans.GetDbTransaction();

var conn = dbTrans?.Connection;
conn.Execute(sql, po, dbTrans);
conn.ExecuteNonQuery(sql, dbTrans, sqlParams);
}

return message;
@@ -112,30 +89,23 @@ namespace DotNetCore.CAP.SqlServer

public void StoreReceivedExceptionMessage(string name, string group, string content)
{
var sql =
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new
object[] sqlParams =
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
new SqlParameter("@Id", SnowflakeId.Default().NextId().ToString()),
new SqlParameter("@Name", name),
new SqlParameter("@Group", group),
new SqlParameter("@Content", content),
new SqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new SqlParameter("@Added", DateTime.Now),
new SqlParameter("@ExpiresAt", DateTime.Now.AddDays(15)),
new SqlParameter("@StatusName", nameof(StatusName.Failed))
};

StoreReceivedMessage(sqlParams);
}

public MediumMessage StoreReceivedMessage(string name, string group, Message message)
{
var sql =
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

var mdMessage = new MediumMessage
{
DbId = SnowflakeId.Default().NextId().ToString(),
@@ -144,19 +114,20 @@ namespace DotNetCore.CAP.SqlServer
ExpiresAt = null,
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.Execute(sql, new

object[] sqlParams =
{
Id = mdMessage.DbId,
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
new SqlParameter("@Id", mdMessage.DbId),
new SqlParameter("@Name", name),
new SqlParameter("@Group", group),
new SqlParameter("@Content", StringSerializer.Serialize(mdMessage.Origin)),
new SqlParameter("@Retries", mdMessage.Retries),
new SqlParameter("@Added", mdMessage.Added),
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
new SqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

StoreReceivedMessage(sqlParams);
return mdMessage;
}

@@ -164,62 +135,90 @@ namespace DotNetCore.CAP.SqlServer
CancellationToken token = default)
{
using var connection = new SqlConnection(_options.Value.ConnectionString);
return await connection.ExecuteAsync(
$"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;",
new { timeout, batchCount });
var count = connection.ExecuteNonQuery(
$"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;", null,
new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount));

return await Task.FromResult(count);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT TOP (200) * FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
var sql = $"SELECT TOP (200) Id, Content, Retries, Added FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

var result = new List<MediumMessage>();
using var connection = new SqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}

return result;
return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"SELECT TOP (200) Id, Content, Retries, Added FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

var result = new List<MediumMessage>();
return await GetMessagesOfNeedRetryAsync(sql);
}

public IMonitoringApi GetMonitoringApi()
{
return new SqlServerMonitoringApi(_options, _initializer);
}

private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {tableName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";

object[] sqlParams =
{
new SqlParameter("@Id", message.DbId),
new SqlParameter("@Retries", message.Retries),
new SqlParameter("@ExpiresAt", message.ExpiresAt),
new SqlParameter("@StatusName", state.ToString("G"))
};

using var connection = new SqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);

await Task.CompletedTask;
}

private void StoreReceivedMessage(object[] sqlParams)
{
var sql =
$"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using var connection = new SqlConnection(_options.Value.ConnectionString);
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
{
List<MediumMessage> result;
using (var connection = new SqlConnection(_options.Value.ConnectionString))
{
result.Add(new MediumMessage
result = connection.ExecuteReader(sql, reader =>
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(4)),
Retries = reader.GetInt32(5),
Added = reader.GetDateTime(6)
var messages = new List<MediumMessage>();
while (reader.Read())
{
messages.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(1)),
Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3)
});
}

return messages;
});
}

return result;
}

public IMonitoringApi GetMonitoringApi()
{
return new SqlServerMonitoringApi(_options, _initializer);
return await Task.FromResult(result);
}
}
}

+ 95
- 0
src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs View File

@@ -0,0 +1,95 @@
using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.SqlServer
{
internal static class IDbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

if (transaction != null)
{
command.Transaction = transaction;
}

return command.ExecuteNonQuery();
}

public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var reader = command.ExecuteReader();

T result = default;
if (readerFunc != null)
{
result = readerFunc(reader);
}

return result;
}

public static T ExecuteScalar<T>(this IDbConnection connection, string sql, params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

using var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = sql;
foreach (var param in sqlParams)
{
command.Parameters.Add(param);
}

var objValue = command.ExecuteScalar();

T result = default;
if (objValue != null)
{
var returnType = typeof(T);
var converter = TypeDescriptor.GetConverter(returnType);
if (converter.CanConvertFrom(objValue.GetType()))
{
result = (T)converter.ConvertFrom(objValue);
}
else
{
result = (T)Convert.ChangeType(objValue, returnType);
}
}

return result;
}
}
}

+ 127
- 65
src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs View File

@@ -3,10 +3,8 @@

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
@@ -32,41 +30,53 @@ namespace DotNetCore.CAP.SqlServer
public StatisticsDto GetStatistics()
{
var sql = $@"
set transaction isolation level read committed;
select count(Id) from {_pubName} with (nolock) where StatusName = N'Succeeded';
select count(Id) from {_recName} with (nolock) where StatusName = N'Succeeded';
select count(Id) from {_pubName} with (nolock) where StatusName = N'Failed';
select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';";

var statistics = UseConnection(connection =>
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT
(
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Succeeded'
) AS PublishedSucceeded,
(
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Succeeded'
) AS ReceivedSucceeded,
(
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Failed'
) AS PublishedFailed,
(
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Failed'
) AS ReceivedFailed;";

StatisticsDto statistics;
using (var connection = new SqlConnection(_options.ConnectionString))
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
statistics = connection.ExecuteReader(sql, reader =>
{
stats.PublishedSucceeded = multi.ReadSingle<int>();
stats.ReceivedSucceeded = multi.ReadSingle<int>();

stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
}
var statisticsDto = new StatisticsDto();

while (reader.Read())
{
statisticsDto.PublishedSucceeded = reader.GetInt32(0);
statisticsDto.ReceivedSucceeded = reader.GetInt32(1);
statisticsDto.PublishedFailed = reader.GetInt32(2);
statisticsDto.ReceivedFailed = reader.GetInt32(3);
}

return statisticsDto;
});
}

return stats;
});
return statistics;
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Failed));
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? _pubName : _recName;
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded)));
return GetHourlyTimelineStats(tableName, nameof(StatusName.Succeeded));
}

public IList<MessageDto> Messages(MessageQueryDto queryDto)
@@ -91,67 +101,75 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';";
var sqlQuery =
$"select * from {tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only";

return UseConnection(conn => conn.Query<MessageDto>(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new
object[] sqlParams =
{
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
Content = "%" + queryDto.Content + "%",
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
new SqlParameter("@StatusName", queryDto.StatusName ?? string.Empty),
new SqlParameter("@Group", queryDto.Group ?? string.Empty),
new SqlParameter("@Name", queryDto.Name ?? string.Empty),
new SqlParameter("@Content", $"%{queryDto.Content}%"),
new SqlParameter("@Offset", queryDto.CurrentPage * queryDto.PageSize),
new SqlParameter("@Limit", queryDto.PageSize)
};

using var connection = new SqlConnection(_options.ConnectionString);
return connection.ExecuteReader(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, reader =>
{
var messages = new List<MessageDto>();

while (reader.Read())
{
var index = 0;
messages.Add(new MessageDto
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
ExpiresAt = reader.GetDateTime(index++),
StatusName = reader.GetString(index)
});
}

return messages;
}, sqlParams);
}

public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Failed));
}

public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_pubName, nameof(StatusName.Succeeded));
}

public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed)));
return GetNumberOfMessage(_recName, nameof(StatusName.Failed));
}

public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded)));
return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded));
}

public async Task<MediumMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT Id as DbId, * FROM {_pubName} WITH (readpast) WHERE Id={id}";
using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id);

public async Task<MediumMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT Id as DbId,* FROM {_recName} WITH (readpast) WHERE Id={id}";
using var connection = new SqlConnection(_options.ConnectionString);
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql);
}
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id);

private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
private int GetNumberOfMessage(string tableName, string statusName)
{
var sqlQuery =
$"select count(Id) from {tableName} with (nolock) where StatusName = @state";

var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}

private T UseConnection<T>(Func<IDbConnection, T> action)
{
return action(new SqlConnection(_options.ConnectionString));
using var connection = new SqlConnection(_options.ConnectionString);
return connection.ExecuteScalar<int>(sqlQuery, new SqlParameter("@state", statusName));
}

private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
private Dictionary<DateTime, int> GetHourlyTimelineStats(string tableName, string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
@@ -163,11 +181,10 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';";

var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);

return GetTimelineStats(connection, tableName, statusName, keyMaps);
return GetTimelineStats(tableName, statusName, keyMaps);
}

private Dictionary<DateTime, int> GetTimelineStats(
IDbConnection connection,
string tableName,
string statusName,
IDictionary<string, DateTime> keyMaps)
@@ -191,11 +208,30 @@ with aggr as (
where StatusName = @statusName
group by FORMAT(Added,'yyyy-MM-dd-HH')
)
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <= @maxKey;";

object[] sqlParams =
{
new SqlParameter("@statusName", statusName),
new SqlParameter("@minKey", keyMaps.Keys.Min()),
new SqlParameter("@maxKey", keyMaps.Keys.Max())
};

var valuesMap = connection
.Query<TimelineCounter>(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
Dictionary<string, int> valuesMap;
using (var connection = new SqlConnection(_options.ConnectionString))
{
valuesMap = connection.ExecuteReader(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, reader =>
{
var dictionary = new Dictionary<string, int>();

while (reader.Read())
{
dictionary.Add(reader.GetString(0), reader.GetInt32(1));
}

return dictionary;
}, sqlParams);
}

foreach (var key in keyMaps.Keys)
{
@@ -211,8 +247,34 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";

return result;
}
}

private async Task<MediumMessage> GetMessageAsync(string tableName, long id)
{
var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}";

using var connection = new SqlConnection(_options.ConnectionString);
var mediumMessae = connection.ExecuteReader(sql, reader =>
{
MediumMessage message = null;

while (reader.Read())
{
message = new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Content = reader.GetString(1),
Added = reader.GetDateTime(2),
ExpiresAt = reader.GetDateTime(3),
Retries = reader.GetInt32(4)
};
}

return message;
});

return await Task.FromResult(mediumMessae);
}
}

internal class TimelineCounter
{


+ 3
- 4
src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs View File

@@ -3,7 +3,6 @@

using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Persistence;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
@@ -40,9 +39,9 @@ namespace DotNetCore.CAP.SqlServer

var sql = CreateDbTablesScript(_options.Value.Schema);
using (var connection = new SqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
connection.ExecuteNonQuery(sql);
await Task.CompletedTask;

_logger.LogDebug("Ensuring all create database tables script are applied.");
}


+ 1
- 0
test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj View File

@@ -11,6 +11,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />


Loading…
Cancel
Save