Поток выполнения операций с одной записью¶
Статья описывает работу с одиночными запросами, пакетные потоки выполнения конфигурируются через batch pipeline.
Общие условия для всех сегментов и потоков выполнения¶
Экземпляры потоков выполнения и сегментов должны иметь уникальный ID. В системе контракт формата имени: Module.MODULE_ID + [имя_точки].
Пример: SDKSpringModule.MODULE_ID + [START_EXAMPLE_POINT] => com.universe.mdm.sdk.spring[START_EXAMPLE_POINT]
Рассматриваемые интерфейсы находятся в пакете org.unidata.mdm.system.type.pipeline
.
Подразумевается, что:
Класс Start соответствует
org.unidata.mdm.system.type.pipeline.Start
Класс Finish соответствует
org.unidata.mdm.system.type.pipeline.Finish
Каждый сегмент потока выполнения реализует org.unidata.mdm.system.type.pipeline.Segment
.
Каждый сегмент является компонентом org.springframework.stereotype.Component
.
Самостоятельная реализация Segment не требуется. Система Universe содержит абстрактные реализации для всех типов SegmentType.
Типы сегментов¶
Доступные типы сегментов в потоке выполнения определены в org.unidata.mdm.system.type.pipeline.SegmentType
.
Абстрактные классы для реализации находятся в пакете org.unidata.mdm.system.type.pipeline
, имя класса совпадает с типом SegmentType.
SegmentType |
Обязательный для потока |
Описание |
---|---|---|
START |
Обязательный |
Стартовая точка потока выполнения |
FINISH |
Обязательный |
Завершающая точка потока выполнения |
POINT |
Нет |
Элемент потока выполнения |
CONNECTOR |
Нет |
Используется, если необходимо встроить поток выполнения в другой поток. Например, после сохранения записи вызвать сохранение связей |
SPLITTER |
Нет |
На данный момент не используется |
SELECTOR |
Нет |
Позволяет выбрать один из потоков выполнения в зависимости от параметра из контекста |
FALLBACK |
Нет |
Выполняет код, если в потоке выполнения выброшена ошибка, но сама ошибка не подавляется |
Примеры реализации сегментов¶
Для реализации потока выполнения дополнительно потребуются 2 класса:
Контекст выполнения - реализация
org.unidata.mdm.system.type.pipeline.PipelineInput
.
Примечание
По контракту системы контекст выполнения неизменяемый. Если необходимо изменять/накапливать информацию в процессе потока выполнения, можно реализовать свой org.unidata.mdm.system.context.StorageCapableContext или наследовать org.unidata.mdm.system.context.CommonRequestContext
Результат выполнения - реализация
org.unidata.mdm.system.type.pipeline.PipelineOutput
.
Примечание
Система Universe предоставляет реализации VoidPipelineOutput, если результат выполнения потока не важен. Например, результат обновления модели org.unidata.mdm.core.service.segments.ModelRefreshFinishExecutor
В методе getStartTypeId
задается ID класса-наследника Start.
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineInputContext
:
public class SDKPipelineInputContext extends CommonRequestContext implements PipelineInput {
public static final StorageId SID_SDK_POINT = new StorageId("SID_SDK_POINT");
private LocalDateTime startDateTime;
protected SDKPipelineInputContext(Builder b) {
super(b);
this.startDateTime = b.startDateTime;
}
public LocalDateTime getStartDateTime() {
return startDateTime;
}
@Override
public String getStartTypeId() {
return SDKPipelineStartExecutor.SEGMENT_ID;
}
public static final class Builder extends CommonRequestContextBuilder<Builder> {
private LocalDateTime startDateTime;
private Builder() {
super();
}
public SDKPipelineInputContext.Builder startDateTime(LocalDateTime startDateTime) {
this.startDateTime = startDateTime;
return this;
}
@Override
public SDKPipelineInputContext build() {
return new SDKPipelineInputContext(this);
}
}
}
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineOutput
:
public class SDKPipelineOutput implements PipelineOutput, ExecutionResult {
private LocalDateTime startTime;
private LocalDateTime completeTime;
public void setCompleteTime(LocalDateTime completeTime) {
this.completeTime = completeTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
}
Сегмент START¶
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineStartExecutor
.
Конфигурация в json: сегмент должен идти первым в конфигурации потока.
{
"segmentType":"START",
// SDKPipelineStartExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
}
Пример реализации сегмента:
@Component(SDKPipelineStartExecutor.SEGMENT_ID)
public class SDKPipelineStartExecutor extends Start<SDKPipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_START]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.start.description";
protected SDKPipelineStartExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineInput.class, SDKPipelineOutput.class);
}
@Override
public void start(SDKPipelineInput ctx) {
// prepare context or else
}
@Override
public String subject(SDKPipelineInput ctx) {
// No subject for this type of pipelines
// This may be storage id in the future
return null;
}
}
Сегмент FINISH¶
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineFinishExecutor
.
Конфигурация в json: сегмент должен идти последним в конфигурации потока.
{
"segmentType":"FINISH",
// SDKPipelineFinishExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
}
Пример реализации сегмента:
@Component(SDKPipelineFinishExecutor.SEGMENT_ID)
public class SDKPipelineFinishExecutor extends Finish<SDKPipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FINISH]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".sdk.pipeline.finish.description";
protected SDKPipelineFinishExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class);
}
@Override
public SDKPipelineOutput finish(SDKPipelineInput ctx) {
SDKPipelineOutput sdkPipelineOutput = new SDKPipelineOutput();
sdkPipelineOutput.setCompleteTime(LocalDateTime.now());
return sdkPipelineOutput;
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInput.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент POINT¶
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinePointExecutor
.
Конфигурация в json:
{
"segmentType":"POINT",
// SDKPipelinePointExecutor.SEGMENT_ID
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
}
Пример реализации сегмента:
@Component(SDKPipelinePointExecutor.SEGMENT_ID)
public class SDKPipelinePointExecutor extends Point<SDKPipelineInputContext> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_POINT]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.point.description";
protected SDKPipelinePointExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Override
public void point(SDKPipelineInputContext ctx) {
ctx.putToStorage(SDKPipelineInputContext.SID_SDK_POINT, true);
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент CONNECTOR¶
Сегмент типа Connector позволяет включить один поток выполнения в другой.
Конфигурация в json:
{
"segmentType":"CONNECTOR",
"id":"com.universe.mdm.sdk.spring.service.pipeline[SDK_PIPELINE_CONNECTOR]"
},
Пример коннектора, при котором подключенный поток выполняется всегда:
@Component(SDKPipelineConnectorExecutor.SEGMENT_ID)
public class SDKPipelineConnectorExecutor extends Connector<PipelineInput, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_CONNECTOR]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.connector.description";
public SDKPipelineConnectorExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Autowired
private ExecutionService executionService;
@Override
public SDKPipelineOutput connect(PipelineInput ctx) {
return execute(ctx, null);
}
@Override
public SDKPipelineOutput connect(PipelineInput ctx, Pipeline p) {
return execute(ctx, p);
}
private SDKPipelineOutput execute(PipelineInput ctx, Pipeline p) {
// Если нет дополнительных условий, поток всегда должен быть запущен
SDKPipelineInputContext sdkCtx =
SDKPipelineInputContext.builder().startDateTime(LocalDateTime.now()).build();
return executionService.execute(sdkCtx);
}
}
Условие запуска потока в коннекторе можно передать через "фрагменты". Для этого контекст основного потока должен реализовать InputFragmentContainer
, контекст подключенного потока реализовать InputFragment
и должен быть задан идентификатор фрагмента.
public class SDKPipelineInputContext implements PipelineInput, InputFragment<SDKPipelineInputContext> {
public static final FragmentId<SDKPipelineInputContext> FRAGMENT_ID
= new FragmentId<>("SDK_PIPELINE_CONTEXT");
@Override
public FragmentId<SDKPipelineInputContext> fragmentId() {
return FRAGMENT_ID;
}
Тогда метод connect(...) из примера выше может выглядеть следующим образом:
@Override
public SDKPipelineOutput connect(PipelineInput ctx) {
InputFragmentContainer target = (InputFragmentContainer) ctx;
SDKPipelineInputContext payload = target.fragment(SDKPipelineInputContext.FRAGMENT_ID);
if (Objects.isNull(payload)) {
return null;
}
return execute(ctx, null);
}
Сегмент SELECTOR¶
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelineSelectorPointExecutor
.
Конфигурация в json:
{
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
"segmentType":"SELECTOR",
"outcomeSegments" : {
"[FIRST]" : [
{
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]",
"segmentType":"POINT"
}
],
"[SECOND]" : [
{
"segmentType":"POINT",
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
}
]
}
Пример реализации (см. класс Outcome):
@Component(SDKPipelineSelectorPointExecutor.SEGMENT_ID)
public class SDKPipelineSelectorPointExecutor extends Selector<SDKPipelineInputContext, SDKPipelineOutput> {
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_SELECTOR]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.selector.description";
public static final Random random = new Random();
protected static final Outcome FIRST_MODE = new Outcome(
"[FIRST]",
SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
SDKPipelineInputContext.class,
SDKPipelineOutput.class);
protected static final Outcome SECOND_MODE = new Outcome(
"[SECOND]",
SDKSpringModule.MODULE_ID + ".pipeline.selector.first.outcome.description",
SDKPipelineInputContext.class,
SDKPipelineOutput.class);
public SDKPipelineSelectorPointExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION, SDKPipelineOutput.class, FIRST_MODE, SECOND_MODE);
}
@Override
public Outcome select(SDKPipelineInputContext ctx) {
if (random.nextBoolean()) {
return FIRST_MODE;
}
return SECOND_MODE;
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Сегмент FALLBACK¶
Пример класса com.universe.mdm.sdk.spring.service.pipeline.SDKPipelinesFallbackExecutor
.
Конфигурация в json:
{
"segmentType":"FALLBACK",
"id":"com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
},
Пример реализации:
@Component(SDKPipelinesFallbackExecutor.SEGMENT_ID)
public class SDKPipelinesFallbackExecutor extends Fallback<SDKPipelineInputContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(SDKPipelinesFallbackExecutor.class);
public static final String SEGMENT_ID = SDKSpringModule.MODULE_ID + "[SDK_PIPELINE_FALLBACK]";
public static final String SEGMENT_DESCRIPTION = SDKSpringModule.MODULE_ID + ".pipeline.fallback.description";
public SDKPipelinesFallbackExecutor() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
@Override
public void accept(SDKPipelineInputContext ctx, Throwable throwable) {
LOGGER.error("sdk pipeline fall ", throwable);
}
@Override
public boolean supports(Start<?, ?> start) {
return SDKPipelineInputContext.class.isAssignableFrom(start.getInputTypeClass());
}
}
Регистрация точек в системе и конфигурация потока выполнения¶
Необходимо в классе-наследнике
AbstractModule
в методе start вызвать addSegments():
public class SDKSpringModule extends AbstractModule {
@Autowired
private SDKSpringConfiguration configuration;
@Override
public void start() {
addSegments(configuration.getBeansOfType(Segment.class).values());
}
Добавить в (файл в описанием потоков) потоков конфигурацию.
{
"startId": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]",
"subjectId": "",
"description": "Example pipeline",
"segments": [
{
"segmentType": "START",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_START]"
},
{
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SELECTOR]",
"segmentType": "SELECTOR",
"outcomeSegments": {
"[FIRST]": [
{
"segmentType": "POINT",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_POINT]"
}
],
"[SECOND]": [
{
"segmentType": "POINT",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_SECOND_POINT]"
}
]
}
},
{
"segmentType": "FALLBACK",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FALLBACK]"
},
{
"segmentType": "FINISH",
"id": "com.universe.mdm.sdk.spring[SDK_PIPELINE_FINISH]"
}
]
}
Или сконфигурировать поток выполнения через код. В примере показана конфигурация потока из трех точек, start, point, finish
@Component
public class RegisterImpl implements AfterPlatformStartup {
@Autowired
private PipelineService pipelineService;
@Override
public void afterPlatformStartup() {
Segment segmentStart = PipelineUtils.findSegment(SDKPipelineStartExecutor.SEGMENT_ID);
Segment segmentFinish = PipelineUtils.findSegment(SDKPipelineFinishExecutor.SEGMENT_ID);
Segment segmentPoint = PipelineUtils.findSegment(SDKPipelinePointExecutor.SEGMENT_ID);
Pipeline pipeline = Pipeline.start((Start<?, ?>) segmentStart, false);
pipeline.with((Point<?>) segmentPoint);
pipeline.end((Finish<?, ?>) segmentFinish);
pipelineService.save(pipeline);
}
}
Добавление точки в существующий поток выполнения¶
Создайте компонент - класс-наследник нужного SegmentType типа.
Зарегистрируйте сегмент в системе (см. выше).
Измените пайплайн в реализации
AfterPlatformStartup::afterPlatformStartup
или добавьте в поток выполнения через UI.Например, добавьте новую точку типа POINT в поток выполнения обновления модели данных. Этот поток типизирован
org.unidata.mdm.meta.context.UploadModelRequestContext
.
Создайте компонент
extends Point<UploadModelRequestContext>
.Задайте уникальный идентификатор точки SEGMENT_ID.
Зафиксируйте имя компонента уникальным SEGMENT_ID
@Component(NewPoint.SEGMENT_ID)
.Задайте локализацию имени точки SEGMENT_DESCRIPTION.
Задайте конструктор:
public NewPoint() {
super(SEGMENT_ID, SEGMENT_DESCRIPTION);
}
Ограничьте поддерживаемые контексты:
@Override
public boolean supports(Start<?, ?> start) {
return UploadModelRequestContext.class.isAssignableFrom(start.getInputTypeClass());
}
Реализуйте логику в методе
point(UploadModelRequestContext ctx)