Поток выполнения над коллекцией записей
Описание потоков выполнения
В системе есть поднятие "аккумулятора записей", это позволяет производить операции над набором контекстов. Обычный поток выполнения обрабатывает, например, одну запись: валидирует запись, затем сохраняет запись в хранилище, индексирует запись, то при помощи аккумулятора можно выполнить сначала валидацию всех записей в аккумуляторе, затем сохранение и индексацию. Такой подход экономит ресурсы.
Пакет с интерфейсами верхнего уровня для аккумуляторов org.unidata.mdm.system.type.batch. Для обработки записей используются наследники org.unidata.mdm.data.type.apply.batch.AbstractBatchSetAccumulator. Например, .org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator.
Точки потоков выполнения и контексты для аккумуляторов наследуют реализации обычных точек. org.unidata.mdm.system.type.pipeline.batch.BatchedConnector наследует org.unidata.mdm.system.type.pipeline.Connector, org.unidata.mdm.system.type.pipeline.batch.BatchedPipelineInput наследует org.unidata.mdm.system.type.pipeline.PipelineInput
Пример (требуется модуль org.unidata.mdm.data)
Рассмотрим точку из модуля-примера com.universe.mdm.sdk.spring.service.pipeline.batch.SDKRecordsEventExecutor.
Класс наследует BatchedPoint<AbstractBatchSetAccumulator>, фиксирует поддерживаемые контексты в методе supports.
В методе point(RecordUpsertBatchSetAccumulator accumulator) обрабатываются данные из аккумулятора.
public class SDKRecordsEventExecutor extends BatchedPoint<RecordUpsertBatchSetAccumulator> {
    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_BATCH_RECORD_EVENT]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.batch.record.point.description";
    public SDKRecordsEventExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    @Override
    public boolean supports(Start<?, ?> start) {
        return RecordUpsertBatchSetAccumulator.class.isAssignableFrom(start.getInputTypeClass());
    }
    @Override
    public void point(RecordUpsertBatchSetAccumulator accumulator) {
        final RecordUpsertBatchSetStatistics statistics = accumulator.statistics();
        if (statistics == null) {
            return;
        }
        for (final UpsertRequestContext ctx : statistics.getInserted()) {
            // your logic
        }
        for (final UpsertRequestContext ctx : statistics.getFailed()) {
            // your logic
        }
    }
}
Код для подключения потока выполнения в другой поток реализуется аналогично простому Connector.