Поток выполнения операций с одной записью
Статья описывает работу с одиночными запросами, пакетные потоки выполнения конфигурируются через batch pipeline.
Общие условия для всех сегментов и потоков выполнения
Экземпляры потоков выполнения и сегментов должны иметь уникальный ID. В системе контракт формата имени: Module.MODULE_ID + [имя_точки].
Пример: SDKSpringModule.MODULE_ID + [START_EXAMPLE_POINT] => com.universe.mdm.sdk.spring[START_EXAMPLE_POINT]
Рассматриваемые интерфейсы находятся в пакете org.unidata.mdm.system.type.pipeline.
Подразумевается, что:
Класс Start соответствует
org.unidata.mdm.system.type.pipeline.StartКласс Finish соответствует
org.unidata.mdm.system.type.pipeline.Finish
Каждый сегмент потока выполнения реализует org.unidata.mdm.system.type.pipeline.Segment.
Каждый сегмент является компонентом org.springframework.stereotype.Component.
Самостоятельная реализация Segment не требуется. Система Universe содержит абстрактные реализации для всех типов SegmentType.
Типы сегментов
Доступные типы сегментов в потоке выполнения определены в org.unidata.mdm.system.type.pipeline.SegmentType.
Абстрактные классы для реализации находятся в пакете org.unidata.mdm.system.type.pipeline, имя класса совпадает с типом SegmentType.
SegmentType |
Обязательный для потока |
Описание |
|---|---|---|
START |
Обязательный |
Стартовая точка потока выполнения |
FINISH |
Обязательный |
Завершающая точка потока выполнения |
POINT |
Нет |
Элемент потока выполнения |
CONNECTOR |
Нет |
Используется, если необходимо встроить поток выполнения в другой поток. Например, после сохранения записи вызвать сохранение связей |
SPLITTER |
Нет |
На данный момент не используется |
SELECTOR |
Нет |
Позволяет выбрать один из потоков выполнения в зависимости от параметра из контекста |
FALLBACK |
Нет |
Выполняет код, если в потоке выполнения выброшена ошибка, но сама ошибка не подавляется |
Примеры реализации сегментов
Для реализации потока выполнения дополнительно потребуются 2 класса:
Контекст выполнения - реализация
org.unidata.mdm.system.type.pipeline.PipelineInput.
Примечание
По контракту системы контекст выполнения неизменяемый. Если необходимо изменять/накапливать информацию в процессе потока выполнения, можно реализовать свой org.unidata.mdm.system.context.StorageCapableContext или наследовать org.unidata.mdm.system.context.CommonRequestContext.
Результат выполнения - реализация
org.unidata.mdm.system.type.pipeline.PipelineOutput.
Примечание
Система Universe предоставляет реализации VoidPipelineOutput, если результат выполнения потока не важен. Например, результат обновления модели org.unidata.mdm.core.service.segments.ModelRefreshFinishExecutor.
В методе getStartTypeId задается ID класса-наследника Start.
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineInputContext:
public class SDKPipelineInputContext extends CommonRequestContext implements PipelineInput {
public static final StorageId SID_SDK_POINT = new StorageId("SID_SDK_POINT");
private LocalDateTime startDateTime;
protected SDKPipelineInputContext(Builder b) {
super(b);
this.startDateTime = b.startDateTime;
}
public LocalDateTime getStartDateTime() {
return startDateTime;
}
@Override
public String getStartTypeId() {
return SDKPipelineStartExecutor.SEGMENT_ID;
}
public static final class Builder extends CommonRequestContextBuilder<Builder> {
private LocalDateTime startDateTime;
private Builder() {
super();
}
public SDKPipelineInputContext.Builder startDateTime(LocalDateTime startDateTime) {
this.startDateTime = startDateTime;
return this;
}
@Override
public SDKPipelineInputContext build() {
return new SDKPipelineInputContext(this);
}
}
}
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineOutput:
public class SDKPipelineOutput implements PipelineOutput, ExecutionResult {
private LocalDateTime startTime;
private LocalDateTime completeTime;
public void setCompleteTime(LocalDateTime completeTime) {
this.completeTime = completeTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
}
Сегмент START
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineStartExecutor.
Конфигурация в json: сегмент должен идти первым в конфигурации потока.
{
"segmentType":"START",
// SDKPipelineStartExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
}
Пример реализации сегмента:
@Component(SDKPipelineStartExecutor.SEGMENT_ID)
public class SDKPipelineStartExecutor extends Start<SDKPipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_START]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.start.description";
protected SDKPipelineStartExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineInput.class, SDKPipelineOutput.class);
}
@Override
public void start(SDKPipelineInput ctx) {
// prepare context or else
}
@Override
public String subject(SDKPipelineInput ctx) {
// No subject for this type of pipelines
// This may be storage id in the future
return null;
}
}
Сегмент FINISH
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineFinishExecutor.
Конфигурация в json: сегмент должен идти последним в конфигурации потока.
{
"segmentType":"FINISH",
// SDKPipelineFinishExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
}
Пример реализации сегмента:
@Component(SDKPipelineFinishExecutor.SEGMENT_ID)
public class SDKPipelineFinishExecutor extends Finish<SDKPipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FINISH]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.finish.description";
protected SDKPipelineFinishExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class);
}
@Override
public SDKPipelineOutput finish(SDKPipelineInput ctx) {
SDKPipelineOutput sdkPipelineOutput = new SDKPipelineOutput();
sdkPipelineOutput.setCompleteTime(LocalDateTime.now());
return sdkPipelineOutput;
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInput.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент POINT
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinePointExecutor.
Конфигурация в json:
{
"segmentType":"POINT",
// SDKPipelinePointExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
}
Пример реализации сегмента:
@Component(SDKPipelinePointExecutor.SEGMENT_ID)
public class SDKPipelinePointExecutor extends Point<SDKPipelineInputContext> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_POINT]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.point.description";
protected SDKPipelinePointExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Override
public void point(SDKPipelineInputContext ctx) {
ctx.putToStorage(SDKPipelineInputContext.SID_SDK_POINT, true);
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент CONNECTOR
Сегмент типа Connector позволяет включить один поток выполнения в другой.
Конфигурация в json:
{
"segmentType":"CONNECTOR",
"id":"com.universe.mdm.sdk.spring.service.pipeline[SDK_PIPELINE_CONNECTOR]"
},
Пример коннектора, при котором подключенный поток выполняется всегда:
@Component(SDKPipelineConnectorExecutor.SEGMENT_ID)
public class SDKPipelineConnectorExecutor extends Connector<PipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_CONNECTOR]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.connector.description";
public SDKPipelineConnectorExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Autowired
private ExecutionService executionService;
@Override
public SDKPipelineOutput connect(PipelineInput ctx) {
return execute(ctx, null);
}
@Override
public SDKPipelineOutput connect(PipelineInput ctx, Pipeline p) {
return execute(ctx, p);
}
private SDKPipelineOutput execute(PipelineInput ctx, Pipeline p) {
// Если нет дополнительных условий, поток всегда должен быть запущен
SDKPipelineInputContext sdkCtx =
SDKPipelineInputContext.builder().startDateTime(LocalDateTime.now()).build();
return executionService.execute(sdkCtx);
}
}
Условие запуска потока в коннекторе можно передать через "фрагменты". Для этого контекст основного потока должен реализовать InputFragmentContainer, контекст подключенного потока реализовать InputFragment и должен быть задан идентификатор фрагмента.
public class SDKPipelineInputContext implements PipelineInput, InputFragment<SDKPipelineInputContext> {
public static final FragmentId<SDKPipelineInputContext> FRAGMENT_ID
= new FragmentId<>("SDK_PIPELINE_CONTEXT");
@Override
public FragmentId<SDKPipelineInputContext> fragmentId() {
return FRAGMENT_ID;
}
Тогда метод connect(...) из примера выше может выглядеть следующим образом:
@Override
public SDKPipelineOutput connect(PipelineInput ctx) {
InputFragmentContainer target = (InputFragmentContainer) ctx;
SDKPipelineInputContext payload = target.fragment(SDKPipelineInputContext.FRAGMENT_ID);
if (Objects.isNull(payload)) {
return null;
}
return execute(ctx, null);
}
Сегмент SELECTOR
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineSelectorPointExecutor.
Конфигурация в json:
{
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
"segmentType":"SELECTOR",
"outcomeSegments" : {
"[FIRST]" : [
{
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]",
"segmentType":"POINT"
}
],
"[SECOND]" : [
{
"segmentType":"POINT",
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
}
]
}
Пример реализации (см. класс Outcome):
@Component(SDKPipelineSelectorPointExecutor.SEGMENT_ID)
public class SDKPipelineSelectorPointExecutor extends Selector<SDKPipelineInputContext, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_SELECTOR]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.selector.description";
public static final Random random = new Random();
protected static final Outcome FIRST_MODE = new Outcome(
"[FIRST]",
SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
SDKPipelineInputContext.class,
SDKPipelineOutput.class);
protected static final Outcome SECOND_MODE = new Outcome(
"[SECOND]",
SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
SDKPipelineInputContext.class,
SDKPipelineOutput.class);
public SDKPipelineSelectorPointExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class, FIRST_MODE, SECOND_MODE);
}
@Override
public Outcome select(SDKPipelineInputContext ctx) {
if (random.nextBoolean()) {
return FIRST_MODE;
}
return SECOND_MODE;
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент FALLBACK
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinesFallbackExecutor.
Конфигурация в json:
{
"segmentType":"FALLBACK",
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
},
Пример реализации:
@Component(SDKPipelinesFallbackExecutor.SEGMENT_ID)
public class SDKPipelinesFallbackExecutor extends Fallback<SDKPipelineInputContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(SDKPipelinesFallbackExecutor.class);
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FALLBACK]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.fallback.description";
public SDKPipelinesFallbackExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Override
public void accept(SDKPipelineInputContext ctx, Throwable throwable) {
LOGGER.error("sdk pipeline fall ", throwable);
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Регистрация точек в системе и конфигурация потока выполнения
Необходимо в классе-наследнике
AbstractModuleв методеstartвызватьaddSegments():
public class SDKSpringModule extends AbstractModule {
@Autowired
private SDKSpringConfiguration configuration;
@Override
public void start() {
// ...
addSegments(configuration.getBeansOfType(Segment.class).values());
// ...
}
Добавить в файл с описанием потоков конфигурации.
{
"startId": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]",
"subjectId": "",
"description": "Example pipeline",
"segments": [
{
"segmentType": "START",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
},
{
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
"segmentType": "SELECTOR",
"outcomeSegments": {
"[FIRST]": [
{
"segmentType": "POINT",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
}
],
"[SECOND]": [
{
"segmentType": "POINT",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
}
]
}
},
{
"segmentType": "FALLBACK",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
},
{
"segmentType": "FINISH",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
}
]
}
Или сконфигурировать поток выполнения через код. В примере показана конфигурация потока из трех точек: start, point, finish.
@Component
public class RegisterImpl implements AfterPlatformStartup {
@Autowired
private PipelineService pipelineService;
@Override
public void afterPlatformStartup() {
Segment segmentStart = PipelineUtils.findSegment(SDKPipelineStartExecutor.SEGMENT_ID);
Segment segmentFinish = PipelineUtils.findSegment(SDKPipelineFinishExecutor.SEGMENT_ID);
Segment segmentPoint = PipelineUtils.findSegment(SDKPipelinePointExecutor.SEGMENT_ID);
Pipeline pipeline = Pipeline.start((Start<?, ?>) segmentStart, false);
pipeline.with((Point<?>) segmentPoint);
pipeline.end((Finish<?, ?>) segmentFinish);
pipelineService.save(pipeline);
}
}
Добавление точки в существующий поток выполнения
Создайте компонент - класс-наследник нужного SegmentType типа.
Зарегистрируйте сегмент в системе (см. выше).
Измените пайплайн в реализации
AfterPlatformStartup::afterPlatformStartupили добавьте в поток выполнения через UI.Например, добавьте новую точку типа POINT в поток выполнения обновления модели данных. Этот поток типизирован
org.unidata.mdm.meta.context.UploadModelRequestContext.
Создайте компонент
extends Point<UploadModelRequestContext>.Задайте уникальный идентификатор точки SEGMENT_ID.
Зафиксируйте имя компонента уникальным SEGMENT_ID
@Component(NewPoint.SEGMENT_ID).Задайте локализацию имени точки SEGMENT_DESCRIPTION.
Задайте конструктор:
public NewPoint() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
Ограничьте поддерживаемые контексты:
@Override
public boolean supports(Start<?, ?> start) {
return UploadModelRequestContext.class.isAssignableFrom(start.getInputTypeClass());
}
Реализуйте логику в методе
point(UploadModelRequestContext ctx)