Browse Source

project init.

master
yangxiaodong 8 years ago
commit
b2f192bbf5
16 changed files with 586 additions and 0 deletions
  1. +51
    -0
      .gitattributes
  2. +30
    -0
      .gitignore
  3. +34
    -0
      Cap.sln
  4. +21
    -0
      LICENSE
  5. +3
    -0
      README.md
  6. +3
    -0
      global.json
  7. +30
    -0
      src/Cap.Consistency/BuilderExtensions.cs
  8. +21
    -0
      src/Cap.Consistency/Cap.Consistency.xproj
  9. +48
    -0
      src/Cap.Consistency/ConsistenceBuilder.cs
  10. +147
    -0
      src/Cap.Consistency/ConsistentMessageManager.cs
  11. +55
    -0
      src/Cap.Consistency/IConsistentMessageStore.cs
  12. +7
    -0
      src/Cap.Consistency/KafkaConsistenceMarkerService.cs
  13. +76
    -0
      src/Cap.Consistency/OperateResult.cs
  14. +19
    -0
      src/Cap.Consistency/Properties/AssemblyInfo.cs
  15. +25
    -0
      src/Cap.Consistency/ServiceCollectionExtensions.cs
  16. +16
    -0
      src/Cap.Consistency/project.json

+ 51
- 0
.gitattributes View File

@@ -0,0 +1,51 @@
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain

*.jpg binary
*.png binary
*.gif binary

*.cs text=auto diff=csharp
*.vb text=auto
*.resx text=auto
*.c text=auto
*.cpp text=auto
*.cxx text=auto
*.h text=auto
*.hxx text=auto
*.py text=auto
*.rb text=auto
*.java text=auto
*.html text=auto
*.htm text=auto
*.css text=auto
*.scss text=auto
*.sass text=auto
*.less text=auto
*.js text=auto
*.lisp text=auto
*.clj text=auto
*.sql text=auto
*.php text=auto
*.lua text=auto
*.m text=auto
*.asm text=auto
*.erl text=auto
*.fs text=auto
*.fsx text=auto
*.hs text=auto

*.csproj text=auto
*.vbproj text=auto
*.fsproj text=auto
*.dbproj text=auto
*.sln text=auto eol=crlf
*.sh eol=lf

+ 30
- 0
.gitignore View File

@@ -0,0 +1,30 @@
[Oo]bj/
[Bb]in/
TestResults/
.nuget/
_ReSharper.*/
packages/
artifacts/
PublishProfiles/
*.user
*.suo
*.cache
*.docstates
_ReSharper.*
nuget.exe
*net45.csproj
*net451.csproj
*k10.csproj
*.psess
*.vsp
*.pidb
*.userprefs
*DS_Store
*.ncrunchsolution
*.*sdf
*.ipch
*.sln.ide
project.lock.json
.vs
.build/
.testPublish/

+ 34
- 0
Cap.sln View File

@@ -0,0 +1,34 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{57A8A8E5-5715-41BF-A0A6-46B819933FBC}"
ProjectSection(SolutionItems) = preProject
global.json = global.json
EndProjectSection
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Cap.Consistency", "src\Cap.Consistency\Cap.Consistency.xproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
EndGlobal

+ 21
- 0
LICENSE View File

@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2016 Savorboard

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

+ 3
- 0
README.md View File

@@ -0,0 +1,3 @@
# kafkabus

a .net core message consistence middleware , and now developing...

+ 3
- 0
global.json View File

@@ -0,0 +1,3 @@
{
"projects": ["src"]
}

+ 30
- 0
src/Cap.Consistency/BuilderExtensions.cs View File

@@ -0,0 +1,30 @@
using Cap.Consistency;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// KafkaConsostence extensions for <see cref="IApplicationBuilder"/>
/// </summary>
public static class BuilderExtensions
{
/// <summary>
/// Enables KafkaConsistence for the current application
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance this method extends.</returns>
public static IApplicationBuilder UseKafkaConsistence(this IApplicationBuilder app) {
if (app == null) {
throw new ArgumentNullException(nameof(app));
}

var marker = app.ApplicationServices.GetService<KafkaConsistenceMarkerService>();
if (marker == null) {
throw new InvalidOperationException("AddKafkaConsistence must be called on the service collection.");
}

return app;
}
}
}

+ 21
- 0
src/Cap.Consistency/Cap.Consistency.xproj View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>

<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>e8af8611-0ea4-4b19-bc48-87c57a87dc66</ProjectGuid>
<RootNamespace>KafkaConsistence</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
</PropertyGroup>

<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>

+ 48
- 0
src/Cap.Consistency/ConsistenceBuilder.cs View File

@@ -0,0 +1,48 @@
using Microsoft.Extensions.DependencyInjection;
using System;

namespace Cap.Consistency
{
/// <summary>
/// Creates a new instance of <see cref="ConsistenceBuilder"/>.
/// </summary>
/// <param name="message">The <see cref="Type"/> to use for the message.</param>
/// <param name="services">The <see cref="IServiceCollection"/> to attach to.</param>
public class ConsistenceBuilder
{
public ConsistenceBuilder(Type message, IServiceCollection service) {
MessageType = message;
Services = service;
}

/// <summary>
/// Gets the <see cref="IServiceCollection"/> services are attached to.
/// </summary>
/// <value>
/// The <see cref="IServiceCollection"/> services are attached to.
/// </value>
public IServiceCollection Services { get; private set; }

/// <summary>
/// Gets the <see cref="Type"/> used for messages.
/// </summary>
/// <value>
/// The <see cref="Type"/> used for messages.
/// </value>
public Type MessageType { get; private set; }

/// <summary>
/// Adds a <see cref="IRoleStore{TRole}"/> for the <seealso cref="RoleType"/>.
/// </summary>
/// <typeparam name="T">The role type held in the store.</typeparam>
/// <returns>The current <see cref="IdentityBuilder"/> instance.</returns>
public virtual ConsistenceBuilder AddMessageStore<T>() where T : class {
return AddScoped(typeof(IConsistentMessageStore<>).MakeGenericType(MessageType), typeof(T));
}

private ConsistenceBuilder AddScoped(Type serviceType, Type concreteType) {
Services.AddScoped(serviceType, concreteType);
return this;
}
}
}

+ 147
- 0
src/Cap.Consistency/ConsistentMessageManager.cs View File

@@ -0,0 +1,147 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Cap.Consistency
{
/// <summary>
/// Provides the APIs for managing message in a persistence store.
/// </summary>
/// <typeparam name="TMessage">The type encapsulating a message.</typeparam>
public class ConsistentMessageManager<TMessage> : IDisposable where TMessage : class
{
private bool _disposed;
private readonly HttpContext _context;
private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None;

/// <summary>
/// Constructs a new instance of <see cref="ConsistentMessageManager{TMessage}"/>.
/// </summary>
/// <param name="store">The persistence store the manager will operate over.</param>
/// <param name="services">The <see cref="IServiceProvider"/> used to resolve services.</param>
/// <param name="logger">The logger used to log messages, warnings and errors.</param>
public ConsistentMessageManager(IConsistentMessageStore<TMessage> store,
IServiceProvider services,
ILogger<ConsistentMessageManager<TMessage>> logger) {
if (store == null) {
throw new ArgumentNullException(nameof(store));
}

Store = store;
Logger = logger;

if (services != null) {
_context = services.GetService<IHttpContextAccessor>()?.HttpContext;
}
}

/// <summary>
/// Gets or sets the persistence store the manager operates over.
/// </summary>
/// <value>The persistence store the manager operates over.</value>
protected internal IConsistentMessageStore<TMessage> Store { get; set; }

/// <summary>
/// Gets the <see cref="ILogger"/> used to log messages from the manager.
/// </summary>
/// <value>
/// The <see cref="ILogger"/> used to log messages from the manager.
/// </value>
protected internal virtual ILogger Logger { get; set; }

/// <summary>
/// Creates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to create.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> CreateAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct

return Store.CreateAsync(message, CancellationToken);
}

/// <summary>
/// Updates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to update.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> UpdateMessageAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct

return Store.UpdateAsync(message, CancellationToken);
}

/// <summary>
/// Deletes the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to delete.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> DeleteMessageAsync(TMessage message) {
ThrowIfDisposed();

if (message == null) {
throw new ArgumentNullException(nameof(message));
}

return Store.DeleteAsync(message, CancellationToken);
}

/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="userId">The message ID to search for.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the user matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<TMessage> FindByIdAsync(string messageId) {
ThrowIfDisposed();
return Store.FindByIdAsync(messageId, CancellationToken);
}

/// <summary>
/// Gets the message identifier for the specified <paramref name="user"/>.
/// </summary>
/// <param name="user">The message whose identifier should be retrieved.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the identifier for the specified <paramref name="message"/>.</returns>
public virtual async Task<string> GetMessageIdAsync(TMessage message) {
ThrowIfDisposed();
return await Store.GetMessageIdAsync(message, CancellationToken);
}

public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Releases the unmanaged resources used by the message manager and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing) {
if (disposing && !_disposed) {
Store.Dispose();
_disposed = true;
}
}

protected void ThrowIfDisposed() {
if (_disposed) {
throw new ObjectDisposedException(GetType().Name);
}
}
}
}

+ 55
- 0
src/Cap.Consistency/IConsistentMessageStore.cs View File

@@ -0,0 +1,55 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Cap.Consistency
{
/// <summary>
/// Provides an abstraction for a store which manages kafka consistent message.
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public interface IConsistentMessageStore<TMessage> : IDisposable where TMessage : class
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<TMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);

/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="IdentityResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="IdentityResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="IdentityResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GetMessageIdAsync(TMessage message, CancellationToken cancellationToken);
}
}

+ 7
- 0
src/Cap.Consistency/KafkaConsistenceMarkerService.cs View File

@@ -0,0 +1,7 @@
namespace Cap.Consistency
{
/// <summary>
/// Used to verify KafkaConsistence was called on a ServiceCollection
/// </summary>
public class KafkaConsistenceMarkerService { }
}

+ 76
- 0
src/Cap.Consistency/OperateResult.cs View File

@@ -0,0 +1,76 @@
using System.Collections.Generic;
using System.Linq;

namespace Cap.Consistency
{
/// <summary>
/// Represents the result of an consistent message operation.
/// </summary>
public class OperateResult
{
private static readonly OperateResult _success = new OperateResult { Succeeded = true };
private List<OperateError> _errors = new List<OperateError>();

/// <summary>
/// Flag indicating whether if the operation succeeded or not.
/// </summary>
public bool Succeeded { get; set; }

/// <summary>
/// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors
/// that occurred during the operation.
/// </summary>
/// <value>An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s.</value>
public IEnumerable<OperateError> Errors => _errors;

/// <summary>
/// Returns an <see cref="IdentityResult"/> indicating a successful identity operation.
/// </summary>
/// <returns>An <see cref="IdentityResult"/> indicating a successful operation.</returns>
public static OperateResult Success => _success;

/// <summary>
/// Creates an <see cref="IdentityResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.
/// </summary>
/// <param name="errors">An optional array of <see cref="IdentityError"/>s which caused the operation to fail.</param>
/// <returns>An <see cref="IdentityResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors) {
var result = new OperateResult { Succeeded = false };
if (errors != null) {
result._errors.AddRange(errors);
}
return result;
}

/// <summary>
/// Converts the value of the current <see cref="IdentityResult"/> object to its equivalent string representation.
/// </summary>
/// <returns>A string representation of the current <see cref="IdentityResult"/> object.</returns>
/// <remarks>
/// If the operation was successful the ToString() will return "Succeeded" otherwise it returned
/// "Failed : " followed by a comma delimited list of error codes from its <see cref="Errors"/> collection, if any.
/// </remarks>
public override string ToString() {
return Succeeded ?
"Succeeded" :
string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
}
}


/// <summary>
/// Encapsulates an error from the operate subsystem.
/// </summary>
public class OperateError
{
/// <summary>
/// Gets or sets ths code for this error.
/// </summary>
public string Code { get; set; }

/// <summary>
/// Gets or sets the description for this error.
/// </summary>
public string Description { get; set; }
}
}

+ 19
- 0
src/Cap.Consistency/Properties/AssemblyInfo.cs View File

@@ -0,0 +1,19 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("KafkaConsistence")]
[assembly: AssemblyTrademark("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("e8af8611-0ea4-4b19-bc48-87c57a87dc66")]

+ 25
- 0
src/Cap.Consistency/ServiceCollectionExtensions.cs View File

@@ -0,0 +1,25 @@
using Cap.Consistency;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="IServiceCollection"/> for configuring kafka consistence services.
/// </summary>
public static class ServiceCollectionExtensions
{

/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <returns>An <see cref="IServiceCollection"/> for application services.</returns>
public static ConsistenceBuilder AddKafkaConsistence<TMessage>(this IServiceCollection services)
where TMessage : class {

services.TryAddSingleton<KafkaConsistenceMarkerService>();

return new ConsistenceBuilder(typeof(TMessage), services);
}
}
}

+ 16
- 0
src/Cap.Consistency/project.json View File

@@ -0,0 +1,16 @@
{
"version": "1.0.0-*",

"dependencies": {
"Microsoft.AspNetCore.Http.Abstractions": "1.1.0-*",
"Microsoft.Extensions.DependencyInjection.Abstractions": "1.1.0",
"Microsoft.Extensions.Logging.Abstractions": "1.1.0-*",
"NETStandard.Library": "1.6.1"
},

"frameworks": {
"netstandard1.6": {
"imports": "dnxcore50"
}
}
}

Loading…
Cancel
Save