Поток выполнения над коллекцией записей¶
Описание потоков выполнения¶
В системе есть поднятие "аккумулятора записей", это позволяет производить операции над набором контекстов. Обычный поток выполнения обрабатывает, например, одну запись: валидирует запись, затем сохраняет запись в хранилище, индексирует запись, то при помощи аккумулятора можно выполнить сначала валидацию всех записей в аккумуляторе, затем сохранение и индексацию. Такой подход экономит ресурсы.
Пакет с интерфейсами верхнего уровня для аккумуляторов org.unidata.mdm.system.type.batch
.
Для обработки записей используются наследники org.unidata.mdm.data.type.apply.batch.AbstractBatchSetAccumulator
.
Например, .org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator
.
Точки потоков выполнения и контексты для аккумуляторов наследуют реализации обычных точек. org.unidata.mdm.system.type.pipeline.batch.BatchedConnector
наследует org.unidata.mdm.system.type.pipeline.Connector
, org.unidata.mdm.system.type.pipeline.batch.BatchedPipelineInput
наследует org.unidata.mdm.system.type.pipeline.PipelineInput
Пример (требуется модуль org.unidata.mdm.data)¶
Рассмотрим точку из модуля-примера com.universe.mdm.sdk.spring.service.pipeline.batch.SDKRecordsEventExecutor
.
Класс наследует BatchedPoint<AbstractBatchSetAccumulator>, фиксирует поддерживаемые контексты в методе supports.
В методе point(RecordUpsertBatchSetAccumulator accumulator) обрабатываются данные из аккумулятора
public class SDKRecordsEventExecutor extends BatchedPoint<RecordUpsertBatchSetAccumulator> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_BATCH_RECORD_EVENT]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.batch.record.point.description";
public SDKRecordsEventExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Override
public boolean supports(Start<?, ?> start) {
return RecordUpsertBatchSetAccumulator.class.isAssignableFrom(start.getInputTypeClass());
}
@Override
public void point(RecordUpsertBatchSetAccumulator accumulator) {
final RecordUpsertBatchSetStatistics statistics = accumulator.statistics();
if (statistics == null) {
return;
}
for (final UpsertRequestContext ctx : statistics.getInserted()) {
// your logic
}
for (final UpsertRequestContext ctx : statistics.getFailed()) {
// your logic
}
}
}
Код для подключения потока выполнения в другой поток реализуется аналогично простому Connector