Статус обработки задачи с очередью сообщений

Я работаю над системой импорта данных продукта, которая загружает данные о продукте из внешних источников, переводит ее в правильную схему и сохраняет результаты – по существу, систему ETL. Основным типом сообщения, которое обрабатывает система, является «ImportProductCommand», который указывает продукт для импорта и источник. Однако команды импорта редко отправляются индивидуально. Типичным бизнес-требованием является импорт целого набора продуктов из данного источника. В настоящее время это выражается как сообщение «ImportProductsCommand», которое может указывать несколько импортируемых продуктов. Обработчик команд потребляет это сообщение, преобразует его в отдельные сообщения «ImportProductCommand» и отправляет их в очередь для обработки. Пользователь индивидуальных запросов импорта публикует «ProductImportedEvent» или «ProductImportFailedEvent». Когда получено сообщение «ImportProductsCommand», служба назначает маркер GUID для сообщения, помещает сообщение в очередь и возвращает токен. Затем токен используется как идентификатор корреляции, так что отдельные запросы импорта могут быть связаны с запросом на импорт партии. Учитывая эту инфраструктуру, можно определить количество событий, связанных с данным токеном, и, следовательно, количество импортированных продуктов или неудачный импорт. Отсутствует явное событие, указывающее, что пакетный импорт завершен. Обработчик отдельных запросов импорта явно не знает, что он является частью запроса на импорт партии. Разумеется, это можно понять, зная, сколько продуктов нужно импортировать и подсчитывать количество событий импорта, связанных с определенным идентификатором корреляции. В настоящее время реализация использует систему очереди сообщений для обработки перезапуска и сбоев процесса, но менее явна о запросе импорта пакета. В целом, запросы, на которые должна отвечать система:

  • Является ли данный пакетный импорт выполненным?
  • Сколько отдельных импортных товаров приходится на определенный импорт партии?
  • Сколько индивидуальных заказов было завершено?
  • Сколько из них было ошибочным?

Каковы некоторые рекомендации или предлагаемые подходы к поддержке этих запросов и по-прежнему используют систему очередности сообщений для обеспечения устойчивости? В настоящее время все, что связывает все это, является упомянутым выше маркером, но нет явной записи для представления объекта запроса пакетного импорта, и если бы это было так, то отдельный процессор запросов импорта должен был знать о таком объекте для обновления соответственно.

Все это реализовано с использованием C #, NServiceBus и размещено как приложение IIS для WCF.

Это может быть реализовано как NServiceBus Saga . ImportProductsCommand должен обрабатываться Saga (ImportProductsSaga ), а данные Saga могут иметь количество импортируемых товаров, поскольку оно отправляет ImportProductCommand . ImportProductsSaga должен обрабатывать ProductImportedEvent и ProductImportFailedEvent . На каждом из этих событий, обработанных в ImportProductsSaga , увеличивайте ProductImported или ProdctsFailedToImport . Также проверьте сумму (ProductsImported + ProdctsFailedToImport) равным ProdctsToBeImported, если да, заполните сагу.

Данные ImportProductsSaga должны отслеживать No of ImportProductCommand, отправлять и получать ответ, и вы можете рассчитать ожидающий ответ и т. Д. Данные Saga выглядят следующим образом:

public class ImportProductsSataData{ public Guid Id {get; set} public int ProdctsToBeImported {get; set} public int ProdctsImported {get; set} public int ProdctsFailedToImport {get; set} }