Поток выполнения операций с одной записью

Статья описывает работу с одиночными запросами, пакетные потоки выполнения конфигурируются через batch pipeline.

Общие условия для всех сегментов и потоков выполнения

Экземпляры потоков выполнения и сегментов должны иметь уникальный ID. В системе контракт формата имени: Module.MODULE_ID + [имя_точки].

Пример: SDKSpringModule.MODULE_ID + [START_EXAMPLE_POINT] => com.universe.mdm.sdk.spring[START_EXAMPLE_POINT]

Рассматриваемые интерфейсы находятся в пакете org.unidata.mdm.system.type.pipeline.

Подразумевается, что:

  • Класс Start соответствует org.unidata.mdm.system.type.pipeline.Start

  • Класс Finish соответствует org.unidata.mdm.system.type.pipeline.Finish

Каждый сегмент потока выполнения реализует org.unidata.mdm.system.type.pipeline.Segment.

Каждый сегмент является компонентом org.springframework.stereotype.Component.

Самостоятельная реализация Segment не требуется. Система Universe содержит абстрактные реализации для всех типов SegmentType.

Типы сегментов

Доступные типы сегментов в потоке выполнения определены в org.unidata.mdm.system.type.pipeline.SegmentType.

Абстрактные классы для реализации находятся в пакете org.unidata.mdm.system.type.pipeline, имя класса совпадает с типом SegmentType.

SegmentType

Обязательный для потока

Описание

START

Обязательный

Стартовая точка потока выполнения

FINISH

Обязательный

Завершающая точка потока выполнения

POINT

Нет

Элемент потока выполнения

CONNECTOR

Нет

Используется, если необходимо встроить поток выполнения в другой поток. Например, после сохранения записи вызвать сохранение связей

SPLITTER

Нет

на данный момент не используется

SELECTOR

Нет

Позволяет выбрать один из потоков выполнения в зависимости от параметра из контекста

FALLBACK

Нет

Выполняет код, если в потоке выполнения выброшена ошибка, но сама ошибка не подавляется

Примеры реализации сегментов

Для реализации потока выполнения дополнительно потребуются 2 класса:

  1. Контекст выполнения - реализация org.unidata.mdm.system.type.pipeline.PipelineInput.

Примечание

По контракту системы контекст выполнения неизменяемый. Если необходимо изменять/накапливать информацию в процессе потока выполнения, можно реализовать свой org.unidata.mdm.system.context.StorageCapableContext или наследовать org.unidata.mdm.system.context.CommonRequestContext.

  1. Результат выполнения - реализация org.unidata.mdm.system.type.pipeline.PipelineOutput`.

Примечание

Система Universe предоставляет реализации VoidPipelineOutput, если результат выполнения потока не важен. Например, результат обновления модели org.unidata.mdm.core.service.segments.ModelRefreshFinishExecutor.

В методе getStartTypeId задается ID класса-наследника Start.

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineInputContext:

public class SDKPipelineInputContext extends CommonRequestContext implements PipelineInput {

    public static final StorageId SID_SDK_POINT = new StorageId("SID_SDK_POINT");

    private LocalDateTime startDateTime;

    protected SDKPipelineInputContext(Builder b) {
        super(b);
        this.startDateTime = b.startDateTime;
    }

    public LocalDateTime getStartDateTime() {
        return startDateTime;
    }

    @Override
    public String getStartTypeId() {
        return SDKPipelineStartExecutor.SEGMENT_ID;
    }

    public static final class Builder extends CommonRequestContextBuilder<Builder> {

        private LocalDateTime startDateTime;

        private Builder() {
            super();
        }

        public SDKPipelineInputContext.Builder startDateTime(LocalDateTime startDateTime) {
            this.startDateTime = startDateTime;
            return this;
        }

        @Override
        public SDKPipelineInputContext build() {
            return new SDKPipelineInputContext(this);
        }
    }
}

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineOutput:

public class SDKPipelineOutput implements PipelineOutput, ExecutionResult {

    private LocalDateTime startTime;
    private LocalDateTime completeTime;

    public void setCompleteTime(LocalDateTime completeTime) {
        this.completeTime = completeTime;
    }

    public void setStartTime(LocalDateTime startTime) {
        this.startTime = startTime;
    }
}

Сегмент START

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineStartExecutor.

Конфигурация в json: сегмент должен идти первым в конфигурации потока.

{
  "segmentType":"START",
    // SDKPipelineStartExecutor.SEGMENT_ID
  "id":  "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
}

Пример реализации сегмента:

@Component(SDKPipelineStartExecutor.SEGMENT_ID)
public class SDKPipelineStartExecutor extends Start<SDKPipelineInput, SDKPipelineOutput> {

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_START]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.start.description";

    protected SDKPipelineStartExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineInput.class, SDKPipelineOutput.class);
    }

    @Override
    public void start(SDKPipelineInput ctx) {
        // prepare context or else
    }

    @Override
    public String subject(SDKPipelineInput ctx) {
        // No subject for this type of pipelines
        // This may be storage id in the future
        return null;
    }
}

Сегмент FINISH

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineFinishExecutor.

Конфигурация в json: сегмент должен идти последним в конфигурации потока.

{
  "segmentType":"FINISH",
    // SDKPipelineFinishExecutor.SEGMENT_ID
  "id":  "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
}

Пример реализации сегмента:

@Component(SDKPipelineFinishExecutor.SEGMENT_ID)
public class SDKPipelineFinishExecutor extends Finish<SDKPipelineInput, SDKPipelineOutput> {

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FINISH]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.finish.description";

    protected SDKPipelineFinishExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class);
    }

    @Override
    public SDKPipelineOutput finish(SDKPipelineInput ctx) {
        SDKPipelineOutput sdkPipelineOutput = new SDKPipelineOutput();
        sdkPipelineOutput.setCompleteTime(LocalDateTime.now());
        return sdkPipelineOutput;
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return SDKPipelineInput.class.isAssignableFrom(start.getInputTypeClass());
    }
}

Сегмент POINT

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinePointExecutor.

Конфигурация в json:

{
      "segmentType":"POINT",
       // SDKPipelinePointExecutor.SEGMENT_ID
      "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
}

Пример реализации сегмента:

@Component(SDKPipelinePointExecutor.SEGMENT_ID)
public class SDKPipelinePointExecutor extends Point<SDKPipelineInputContext> {

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_POINT]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.point.description";

    protected SDKPipelinePointExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }

    @Override
    public void point(SDKPipelineInputContext ctx) {
        ctx.putToStorage(SDKPipelineInputContext.SID_SDK_POINT, true);
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}

Сегмент CONNECTOR

Сегмент типа Connector позволяет включить один поток выполнения в другой.

Конфигурация в json:

{
    "segmentType":"CONNECTOR",
    "id":"com.universe.mdm.sdk.spring.service.pipeline[SDK_PIPELINE_CONNECTOR]"
},

Пример коннектора, при котором подключенный поток выполняется всегда:

@Component(SDKPipelineConnectorExecutor.SEGMENT_ID)
public class SDKPipelineConnectorExecutor extends Connector<PipelineInput, SDKPipelineOutput> {


    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_CONNECTOR]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.connector.description";

    public SDKPipelineConnectorExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }

    @Autowired
    private ExecutionService executionService;

    @Override
    public SDKPipelineOutput connect(PipelineInput ctx) {
        return execute(ctx, null);
    }

    @Override
    public SDKPipelineOutput connect(PipelineInput ctx, Pipeline p) {
        return execute(ctx, p);
    }

    private SDKPipelineOutput execute(PipelineInput ctx, Pipeline p) {
        // Если нет дополнительных условий, поток всегда должен быть запущен
        SDKPipelineInputContext sdkCtx =
                SDKPipelineInputContext.builder().startDateTime(LocalDateTime.now()).build();
        return executionService.execute(sdkCtx);
    }

}

Условие запуска потока в коннекторе можно передать через "фрагменты". Для этого контекст основного потока должен реализовать InputFragmentContainer, контекст подключенного потока реализовать InputFragment и должен быть задан идентификатор фрагмента.

public class SDKPipelineInputContext implements PipelineInput, InputFragment<SDKPipelineInputContext> {

    public static final FragmentId<SDKPipelineInputContext> FRAGMENT_ID
        = new FragmentId<>("SDK_PIPELINE_CONTEXT");

    @Override
    public FragmentId<SDKPipelineInputContext> fragmentId() {
        return FRAGMENT_ID;
    }

Тогда метод connect(...) из примера выше может выглядеть следующим образом:

@Override
public SDKPipelineOutput connect(PipelineInput ctx) {

    InputFragmentContainer target = (InputFragmentContainer) ctx;
    SDKPipelineInputContext payload = target.fragment(SDKPipelineInputContext.FRAGMENT_ID);
    if (Objects.isNull(payload)) {
        return null;
    }

    return execute(ctx, null);
}

Сегмент SELECTOR

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineSelectorPointExecutor.

Конфигурация в json:

{
      "id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
      "segmentType":"SELECTOR",
      "outcomeSegments" : {
        "[FIRST]" : [
          {
            "id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]",
            "segmentType":"POINT"
          }
        ],
        "[SECOND]" : [
          {
            "segmentType":"POINT",
            "id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
          }
        ]
      }

Пример реализации (см. класс Outcome):

@Component(SDKPipelineSelectorPointExecutor.SEGMENT_ID)
public class SDKPipelineSelectorPointExecutor extends Selector<SDKPipelineInputContext, SDKPipelineOutput> {

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_SELECTOR]";

    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.selector.description";

    public static final Random random = new Random();

    protected static final Outcome FIRST_MODE = new Outcome(
            "[FIRST]",
            SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
            SDKPipelineInputContext.class,
            SDKPipelineOutput.class);

    protected static final Outcome SECOND_MODE = new Outcome(
            "[SECOND]",
            SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
            SDKPipelineInputContext.class,
            SDKPipelineOutput.class);

    public SDKPipelineSelectorPointExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class, FIRST_MODE, SECOND_MODE);
    }

    @Override
    public Outcome select(SDKPipelineInputContext ctx) {
        if (random.nextBoolean()) {
            return FIRST_MODE;
        }
        return SECOND_MODE;
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}

Сегмент FALLBACK

Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinesFallbackExecutor.

Конфигурация в json:

{
  "segmentType":"FALLBACK",
  "id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
},

Пример реализации:

@Component(SDKPipelinesFallbackExecutor.SEGMENT_ID)
public class SDKPipelinesFallbackExecutor extends Fallback<SDKPipelineInputContext> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SDKPipelinesFallbackExecutor.class);

    public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FALLBACK]";
    public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.fallback.description";

    public SDKPipelinesFallbackExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }

    @Override
    public void accept(SDKPipelineInputContext ctx, Throwable throwable) {
        LOGGER.error("sdk pipeline fall ", throwable);
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}

Регистрация точек в системе и конфигурация потока выполнения

  1. Необходимо в классе-наследнике AbstractModule в методе start вызвать addSegments():

public class SDKSpringModule extends AbstractModule {

    @Autowired
    private SDKSpringConfiguration configuration;

    @Override
    public void start() {
        // ...
        addSegments(configuration.getBeansOfType(Segment.class).values());
        // ...
    }
  1. Добавить в (файл в описанием потоков) потоков конфигурацию.

{
  "startId": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]",
  "subjectId": "",
  "description": "Example pipeline",
  "segments": [
    {
      "segmentType": "START",
      "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
    },
    {
      "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
      "segmentType": "SELECTOR",
      "outcomeSegments": {
        "[FIRST]": [
          {
            "segmentType": "POINT",
            "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
          }
        ],
        "[SECOND]": [
          {
            "segmentType": "POINT",
            "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
          }
        ]
      }
    },
    {
      "segmentType": "FALLBACK",
      "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
    },
    {
      "segmentType": "FINISH",
      "id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
    }
  ]
}
  1. Или сконфигурировать поток выполнения через код. В примере показана конфигурация потока из трех точек: start, point, finish.

@Component
public class RegisterImpl implements AfterPlatformStartup {

    @Autowired
    private PipelineService pipelineService;

    @Override
    public void afterPlatformStartup() {
        Segment segmentStart = PipelineUtils.findSegment(SDKPipelineStartExecutor.SEGMENT_ID);
        Segment segmentFinish = PipelineUtils.findSegment(SDKPipelineFinishExecutor.SEGMENT_ID);
        Segment segmentPoint = PipelineUtils.findSegment(SDKPipelinePointExecutor.SEGMENT_ID);

        Pipeline pipeline = Pipeline.start((Start<?, ?>) segmentStart, false);
        pipeline.with((Point<?>) segmentPoint);
        pipeline.end((Finish<?, ?>) segmentFinish);

        pipelineService.save(pipeline);
    }
}

Добавление точки в существующий поток выполнения

  1. Создайте компонент - класс-наследник нужного SegmentType типа.

  2. Зарегистрируйте сегмент в системе (см. выше).

  3. Измените пайплайн в реализации AfterPlatformStartup::afterPlatformStartup или добавьте в поток выполнения через UI.

    • Например, добавьте новую точку типа POINT в поток выполнения обновления модели данных. Этот поток типизирован org.unidata.mdm.meta.context.UploadModelRequestContext.

  4. Создайте компонент extends Point<UploadModelRequestContext>.

  5. Задайте уникальный идентификатор точки SEGMENT_ID.

  6. Зафиксируйте имя компонента уникальным SEGMENT_ID @Component(NewPoint.SEGMENT_ID).

  7. Задайте локализацию имени точки SEGMENT_DESCRIPTION.

  8. Задайте конструктор:

public NewPoint() {
    super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
  1. Ограничьте поддерживаемые контексты:

@Override
public boolean supports(Start<?, ?> start) {
    return UploadModelRequestContext.class.isAssignableFrom(start.getInputTypeClass());
}
  1. Реализуйте логику в методе point(UploadModelRequestContext ctx)