You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

messaging.md 6.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # Message
  2. The data sent by using the `ICapPublisher` interface is called `Message`.
  3. ## Compensating transaction
  4. Wiki :
  5. [Compensating transaction](https://en.wikipedia.org/wiki/Compensating_transaction)
  6. In some cases, consumers need to return the execution value to tell the publisher, so that the publisher can implement some compensation actions, usually we called message compensation.
  7. Usually you can notify the upstream by republishing a new message in the consumer code. CAP provides a simple way to do this. You can specify `callbackName` parameter when publishing message, usually this only applies to point-to-point consumption. The following is an example.
  8. For example, in an e-commerce application, the initial status of the order is pending, and the status is marked as succeeded when the product quantity is successfully deducted, otherwise it is failed.
  9. ```C#
  10. // ============= Publisher =================
  11. _capBus.Publish("place.order.qty.deducted",
  12. contentObj: new { OrderId = 1234, ProductId = 23255, Qty = 1 },
  13. callbackName: "place.order.mark.status");
  14. // publisher using `callbackName` to subscribe consumer result
  15. [CapSubscribe("place.order.mark.status")]
  16. public void MarkOrderStatus(JToken param)
  17. {
  18. var orderId = param.Value<int>("OrderId");
  19. var isSuccess = param.Value<bool>("IsSuccess");
  20. if(isSuccess)
  21. //mark order status to succeeded
  22. else
  23. //mark order status to failed
  24. }
  25. // ============= Consumer ===================
  26. [CapSubscribe("place.order.qty.deducted")]
  27. public object DeductProductQty(JToken param)
  28. {
  29. var orderId = param.Value<int>("OrderId");
  30. var productId = param.Value<int>("ProductId");
  31. var qty = param.Value<int>("Qty");
  32. //business logic
  33. return new { OrderId = orderId, IsSuccess = true };
  34. }
  35. ```
  36. ## Heterogeneous system integration
  37. In version 3.0+, we reconstructed the message structure. We used the Header in the message protocol in the message queue to transmit some additional information, so that we can do it in the Body without modifying or packaging the user’s original The message data format and content are sent.
  38. This approach is reasonable. It helps to better integrate in heterogeneous systems. Compared with previous versions, users do not need to know the message structure used inside CAP to complete the integration work.
  39. Now we divide the message into Header and Body for transmission.
  40. The data in the body is the content of the original message sent by the user, that is, the content sent by calling the Publish method. We do not perform any packaging, but send it to the message queue after serialization.
  41. In the Header, we need to pass some additional information so that the CAP can extract the key features for operation when the message is received.
  42. The following is the content that needs to be written into the header of the message when sending a message in a heterogeneous system:
  43. Key | DataType | Description
  44. -- | --| --
  45. cap-msg-id | string | Message Id, Generated by snowflake algorithm, can also be guid
  46. cap-msg-name | string | The name of the message
  47. cap-msg-type | string | The type of message, `typeof(T).FullName`(not required)
  48. cap-senttime | string | sending time (not required)
  49. ### Custom headers
  50. To consume messages sent without CAP headers, both Kafka and RabbitMQ consumers can inject a minimal set of headers using custom headers as shown below:
  51. ```C#
  52. container.AddCap(x =>
  53. {
  54. x.UseRabbitMQ(z =>
  55. {
  56. z.ExchangeName = "TestExchange";
  57. z.CustomHeaders = e => new List<KeyValuePair<string, string>>
  58. {
  59. new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  60. new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageName, e.RoutingKey)
  61. };
  62. });
  63. });
  64. ```
  65. After adding `cap-msg-id` and `cap-msg-name`, CAP consumers receive messages sent directly from the RabbitMQ management tool.
  66. ## Scheduling
  67. After CAP receives a message, it sends the message to Transport(RabitMq, Kafka...), which is transported by transport.
  68. When you send message using the `ICapPublisher` interface, CAP will dispatch message to the corresponding Transport. Currently, bulk messaging is not supported.
  69. For more information on transports, see [Transports](../transport/general.md) section.
  70. ## Storage
  71. CAP will store the message after receiving it. For more information on storage, see the [Storage](../storage/general.md) section.
  72. ## Retry
  73. Retrying plays an important role in the overall CAP architecture design, CAP retry messages that fail to send or fail to execute. There are several retry strategies used throughout the CAP design process.
  74. ### Send retry
  75. During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50,CAP will stop retrying.
  76. You can adjust the total number of retries by setting `FailedRetryCount` in CapOptions.
  77. It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry.
  78. ### Consumption retry
  79. The consumer method is executed when the Consumer receives the message and will retry when an exception occurs. This retry strategy is the same as the send retry.
  80. ## Data Cleanup
  81. There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later.
  82. Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later.
  83. By default, the data of the message in the table is deleted **every hour** to avoid performance degradation caused by too much data. The cleanup strategy `ExpiresAt` is performed when field is not empty and is less than the current time.
  84. That is to say, the message with the status Failed (by default they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up.