Потоки выполнения

Поток выполнения - способ обработки данных, где этапы обработки данных выполняются последовательно.

Интерфейсы, позволяющие определить процесс обработки, находятся в пакете org.unidata.mdm.system.type.pipeline.

У архитектуры потоков выполнения есть 3 основных интерфейса:

  • Поток выполнения (Execution Pipelines),

  • Точки интеграции (Pipeline Segments)

  • Соединения между потоками выполнения (Pipeline Connectors).

Execution Pipelines

Execution Pipelines это способ организации обработки данных. Конечный результат, который должен быть достигнут в результате внедрения потоков выполнения — это возможность строить и (пере)использовать гибкие конвейеры операций над данными, состоящие из так называемых точек расширения (Integration Points), экспортируемых модулями системы и доступных для конфигурации из пользовательского интерфейса.

Pipeline определяется в конфигурации и хранится в модели данных.

Execution Pipelines типизированы. Типы Pipelines публикуются модулями, являющимися потенциально конечными точками в обработке данных. Публикация типа означает имплементацию интерфейса com.unidata.mdm.pipeline.PipelineType и возврат этого типа или нескольких из Collection<PipelineType> com.unidata.mdm.module.Module.getPipelineTypes().

Например, модуль com.unidata.mdm.dq может публиковать тип DQ_CLEANSE. Модуль com.unidata.mdm.data публикует несколько Pipelines — DATA_UPSERT, DATA_DELETE, DATA_GET, DATA_MERGE и т.д. Сторонние модули также могут публиковать свои Pipelines.

Типы точек расширения

Модули публикуют списки точек расширения (Integration Points), которые могут поддерживать только определенные типы Pipelines (это, например, специализированный код апсерта, который не имеет смысла в контексте модификаций записей).

В этом случае вызов com.unidata.mdm.module.IntegrationPoint.getSupportedPipelineTypes() должен вернуть список типов Pipelines и поддерживать любые типы Pipelines, запускающиеся с определенным типом контекста данных.

В этом случае вызов com.unidata.mdm.module.IntegrationPoint.getSupportedPipelineTypes() должен вернуть пустой список.

Точки расширения возвращают тип обработки запроса (уровень доступа к данным) — INPUT_PROCESSOR или OUTPUT_PROCESSOR.

Этот тип указывает на тип контекста данных, с которым точка интеграции может работать.

Разница между ними в том, что INPUT_PROCESSOR может писать в текущий ModificationBox (т.е. вносить потенциально персистентные изменения в состояние записи), а OUTPUT_PROCESSOR не может.

Код точек расширения работает, соответственно, с типами PipelineAwareInputContext и PipelineAwareOutputContext. Таким образом, для некоторого типа Pipeline, возвращающего из com.unidata.mdm.pipeline.PipelineType.getIntegrationPointTypeLevel() INPUT_PROCESSOR могут быть выбраны точки расширения имеющие тип как INPUT_PROCESSOR, так и OUTPUT_PROCESSOR. Для типа OUTPUT_PROCESSOR требуется строгое соответствие типов точек расширения.

Это соответствие должно быть обеспечено при конфигурации. ExecutionService должен пропускать точки расширения неподходящие к типу Pipeline при запуске Pipeline с предупреждением.

Контекст выполнения

Контексты выполнения, такие как UpsertRequestContext или DeleteRequestContext, помечаются интерфейсами поддержки Pipelines. Это PipelineAwareInputContext и PipelineAwareOutputContext. Например, GetRequestContext или GetRelationRequestContext могут быть помечены только PipelineAwareOutputContext, т.к. не имеют ModificationBox и не меняют персистентного состояния записи.

Хранение конфигураций

Типы потоков выполнения и типы точек интеграции существуют только во время выполнения и являются статическими инстанциями классов, имплементирующих PipelineType, PointType или ConnectorType соответственно. Экземпляры потоков выполнения и сегментов (сконфигурированные потоки и сегменты) существуют как часть конфигурации и сохраняются в персистентное хранилище.

Для сохранения конфигурации потоков и сегментов предполагается в силу простоты использовать JSON. Доступ к представлению пертинентной конфигурации должен осуществляться не напрямую, а через прослойку классов — адаптеров.

Пример конфигурации экземпляра потока выполнения:

{
"pipeline": {

"name" : "default_data_upsert", "type" : "RECORD_UPSERT", "segments" : [

{

"name" : "default_data_prepare", "type" : "RECORD_PREPARE", "kind" : "point"

}, {

"name" : "default_dq_origin", "type" : "DQ_RECORD_ORIGIN", "kind" : "point"

}, {

"name" : "default_data_timeline", "type" : "RECORD_TIMELINE", "kind" : "point"

}, {

"name" : "default_dq_etalon", "type" : "DQ_RECORD_ETALON", "kind" : "point"

}, {

"name" : "default_data_collect", "type" : "RECORD_COLLECT", "kind" : "point"

}, {

"name" : "default_data_apply", "type" : "RECORD_APPLY", "kind" : "point"

}, {

"name" : "default_rel_connect", "type" : "RELATION_CONNECT", "kind" : "connector", "connectedPipeline" : "RELATION_UPSERT", "collection" : true

}, {

"name" : "default_clsf_connect", "type" : "CLASSIFIER_CONNECT", "kind" : "connector", "connectedPipeline" : "CLASSIFIER_UPSERT", "collection" : true

}

]

}

}

Доступ к конфигурации потоков и сегментов, их непосредственная конфигурация и сохранение конфигураций в персистентном хранилище, рассылка нотификаций об изменении конфигурации, а также реакция на такие изменения на других узлах кластера лежит в PipelineService.

PipelineService принимает участие в системном событии контейнера Context Refresh после старта MetaModelService с целью подъема конфигураций и установки приоритетов экземпляров потоков для конкретных реестров. После изменения и успешного сохранения конфигурации PipelineService отсылает сообщение с описанием события с помощью стандартного механизма EventService, для того, чтобы другие узлы кластера могли загрузить эти изменения.

Экземпляры потоков выполнения и сегментов должны также иметь уникальный ID. Для экземпляров сегментов возможно автоматическое использование схемы "ID экземпляра потока выполнения"."ID сегмента".

Создание пользовательского потока выполнения

  1. Контекст выполнения, реализация org.unidata.mdm.system.type.pipeline.PipelineInput.

  2. Результат выполнения, реализация org.unidata.mdm.system.type.pipeline.PipelineOutput.

Примечание

Система Universe предоставляет реализации VoidPipelineOutput, если результат выполнения поток не важен. Например, результат обновления модели org.unidata.mdm.core.service.segments.ModelRefreshFinishExecutor

  1. Стартовая точка потока выполнения extends org.unidata.mdm.system.type.pipeline.Start<C extends PipelineInput, R extends PipelineOutput>.

  2. Завершающая точка выполнения org.unidata.mdm.system.type.pipeline.Finish<C extends PipelineInput, R extends PipelineOutput>.

Дополнительные возможности:

  • Connector - если необходимо встроить поток в уже существующий.