Поток выполнения над коллекцией записей

Описание потоков выполнения

В системе есть поднятие "аккумулятора записей", это позволяет производить операции над набором контекстов. Обычный поток выполнения обрабатывает, например, одну запись: валидирует запись, затем сохраняет запись в хранилище, индексирует запись, то при помощи аккумулятора можно выполнить сначала валидацию всех записей в аккумуляторе, затем сохранение и индексацию. Такой подход экономит ресурсы.

Пакет с интерфейсами верхнего уровня для аккумуляторов org.unidata.mdm.system.type.batch. Для обработки записей используются наследники org.unidata.mdm.data.type.apply.batch.AbstractBatchSetAccumulator. Например, .org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator.

Точки потоков выполнения и контексты для аккумуляторов наследуют реализации обычных точек. org.unidata.mdm.system.type.pipeline.batch.BatchedConnector наследует org.unidata.mdm.system.type.pipeline.Connector, org.unidata.mdm.system.type.pipeline.batch.BatchedPipelineInput наследует org.unidata.mdm.system.type.pipeline.PipelineInput

Пример (требуется модуль org.unidata.mdm.data)

Рассмотрим точку из модуля-примера com.universe.mdm.sdk.spring.service.pipeline.batch.SDKRecordsEventExecutor.

Класс наследует BatchedPoint<AbstractBatchSetAccumulator>, фиксирует поддерживаемые контексты в методе supports.

В методе point(RecordUpsertBatchSetAccumulator accumulator) обрабатываются данные из аккумулятора.

public class SDKRecordsEventExecutor extends BatchedPoint<RecordUpsertBatchSetAccumulator> {

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_BATCH_RECORD_EVENT]";

    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.batch.record.point.description";

    public SDKRecordsEventExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return RecordUpsertBatchSetAccumulator.class.isAssignableFrom(start.getInputTypeClass());
    }


    @Override
    public void point(RecordUpsertBatchSetAccumulator accumulator) {
        final RecordUpsertBatchSetStatistics statistics = accumulator.statistics();
        if (statistics == null) {
            return;
        }


        for (final UpsertRequestContext ctx : statistics.getInserted()) {

            // your logic
        }
        for (final UpsertRequestContext ctx : statistics.getFailed()) {
            // your logic
        }

    }

}

Код для подключения потока выполнения в другой поток реализуется аналогично простому Connector.