Processing status of message queue task

I'm working on a product data import system that loads product data from external sources, translates it into the correct schema, and stores the results - essentially an ETL system. The main message type that the system processes is "ImportProductCommand", which indicates the product to import and the source. However, import commands are rarely sent individually. A typical business requirement is to import a whole range of products from a given source. This is currently expressed as an "ImportProductsCommand" message, which can indicate multiple imported products. The command handler consumes this message, converts it into separate ImportProductCommand messages, and posts them to the queue for processing. The user of the individual import requests publishes "ProductImportedEvent "or" ProductImportFailedEvent "When the" ImportProductsCommand "message is received, the service assigns a GUID token to the message, queues the message, and returns a token. The token is then used as a correlation identifier so that individual import requests can be associated with a batch import request. Given this infrastructure, it is possible to determine the number of events associated with a given token, and therefore the number of imported products or failed imports. There is no explicit event indicating that the batch import has completed. The individual import request handler does not explicitly know that it is part of a request batch import. Of course, this can be understood by knowing how many products need to be imported and counting the number of import events,associated with a specific correlation identifier. The implementation currently uses a message queuing system to handle process restarts and crashes, but is less explicit about the package import request. In general, the requests that the system should respond to are as follows:

  • Is this batch import done?
  • How many individual imports are left for a given batch import?
  • How many custom orders have been completed?
  • How many of them were wrong?

What are some best practices or suggested approaches to support these requests and still use message queuing for resiliency? Currently all that ties it all together is the token mentioned above, but there is no explicit entry to represent a batch import request object, and if that were the case, then a separate import request processor would have to know about such an object to update accordingly.

This is all implemented using C #, NServiceBus, and hosted as an IIS Application for WCF.

+3


source to share


1 answer


This can be implemented as NServiceBus Saga . The ImportProductsCommand must be handled by the Saga (ImportProductsSaga ), and the Saga data can have the number of imported products as it sends the ImportProductCommand . ImportProductsSaga must handle ProductImportedEvent and ProductImportFailedEvent . Each of these events handled ImportProductsSaga , increment ProductsImported, or ProdctsFailedToImport . Also check the sum (ProductsImported + ProdctsFailedToImport) equal to ProdctsToBeImported, if yes, complete the saga.

ImportProductsSaga data should track No of ImportProductCommand, send and receive response, and you can calculate pending response, etc. The saga data looks like this:



   public class ImportProductsSataData{ 
       public Guid Id {get; set}
       public int ProdctsToBeImported {get; set}
       public int ProdctsImported {get; set}
       public int ProdctsFailedToImport {get; set}
}

      

+5


source







All Articles