Пользовательские функции обработки данных¶
Статья описывает создание функций обработки данных.
Юниверс MDM поддерживают два вида функций: обогащения и валидации данных.
Зависимости¶
implementation "org.unidata.mdm:org.unidata.mdm.dq.core:${platformVersion}"
Необходимо указывать зависимости в имплементации модуля:
@Override
public Collection<Dependency> getDependencies() {
return Arrays.asList(
// system, core, etc
new Dependency("org.unidata.mdm.dq.core", "6.0")
);
}
Интерфейсы¶
Пользовательские функции должны реализовывать org.unidata.mdm.dq.core.type.cleanse.CleanseFunction
.
/**
* Returns CleanseFunction definition in terms of
* - name
* - display name
* - description
* - list of input ports
* - list of output ports
* Если функция возвращает конфигурацию, то она считается "автоконфигурированной" и
* конфигурация для этого элемента недоступна в пользовательском интерфейсе (устанавливается только для чтения).
*
* @return configuration or null
*/
@Nullable
default CleanseFunctionConfiguration configure() {
return null;
}
/**
* Выполняет функцию очистки в заданном контексте.
* @param ctx the context
* @return the result or null
*/
@Nullable
default CleanseFunctionResult execute(CleanseFunctionContext ctx) {
return null;
}
/**
* @return true, если данная функция имеет значения по умолчанию (name, display name, description)
*/
default boolean hasDefaults() {
return false;
}
Примечание
Реализация CleanseFunctionConfiguration configure() не обязательна, но если конфигурация переопределена, то параметры функции в пользовательском интерфейсе не редактируются.
CleanseFunctionConfiguration¶
Класс CleanseFunctionConfiguration
описывает основные параметры конфигурации:
List<CleansePortConfiguration> input |
входные порты |
List<CleansePortConfiguration> output |
выходные порты |
Set<CleanseFunctionExecutionScope> supported |
|
Совет
См. подробнее о режимах обработки функции
Внутренний класс CleansePortConfiguration
содержит поля:
name |
Системное имя порта |
displayName |
Отображаемое имя порта |
description |
Описание порта |
filteringMode |
|
valueTypes |
Тип атрибута, на которое назначается правило. org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType |
required |
Обязателен для заполнения |
Если планируется работа с правилом обработки данных через UI, необходимо реализовать интерфейс org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionDefaults и обязательно переопределить CleanseFunction::hasDefaults() на return true;
public class EnrichCleanseFunction implements CleanseFunction, CleanseFunctionDefaults {
private static final String INPUT_PORT_1_NAME = SDKSpringModule.MODULE_ID + ".cleanse.function.enrich.input.port1" +
".name";
private static final String INPUT_PORT_1_DESCRIPTION = SDKSpringModule.MODULE_ID
+ ".cleanse.function.enrich.input.port1.description";
private static final String OUTPUT_PORT_1_NAME = SDKSpringModule.MODULE_ID
+ ".cleanse.function.enrich.output.port1.name";
private static final String OUTPUT_PORT_1_DESCRIPTION = SDKSpringModule.MODULE_ID
+ ".cleanse.function.enrich.output.port1.description";
@Override
public CleanseFunctionConfiguration configure() {
return CleanseFunctionConfiguration.configuration()
.supports(CleanseFunctionExecutionScope.LOCAL, CleanseFunctionExecutionScope.GLOBAL)
.input(
// Конфигурация входного порта
CleanseFunctionConfiguration.port()
.name(INPUT_PORT_1)
.displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
.description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
.filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
.inputTypes(CleanseFunctionPortInputType.SIMPLE)
.valueTypes(CleanseFunctionPortValueType.STRING)
.required(true)
.build())
.output(CleanseFunctionConfiguration.port()
.name(OUTPUT_PORT_1)
.displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
.description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
.filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
.inputTypes(CleanseFunctionPortInputType.SIMPLE)
.valueTypes(CleanseFunctionPortValueType.STRING)
.required(true)
.build())
.build();
}
@Override
public CleanseFunctionResult execute(CleanseFunctionContext ctx) {
CleanseFunctionInputParam stringParam = ctx.getInputParam(INPUT_PORT_1);
CleanseFunctionResult result = new CleanseFunctionResult();
if (Objects.isNull(stringParam)) {
return result;
}
String strValue = stringParam.toSingletonValue();
if (strValue == null || strValue.isEmpty()) {
return result;
}
result.putOutputParam(CleanseFunctionOutputParam.of(OUTPUT_PORT_1, strValue + "+3"));
return result;
}
@Override
default boolean hasDefaults() {
return true;
}
Примечание
Некоторые функции обработки данных в системе наследуют абстрактный класс org.unidata.mdm.dq.core.type.function.AbstractBasicCleanseFunction. Он предназначен для системных функций, недоступен для редактирования, обратная совместимость не гарантируется.
CleanseFunctionResult¶
Метод CleanseFunction::execute возвращает экземпляр объекта com.universe.mdm.sdk.spring.service.impl.function.CleanseFunctionResult
. Ошибки валидации передаются в метод CleanseFunction::addSpot. Результат обогащения передается в выходные порты CleanseFunction::putOutputParam
Регистрация в системе¶
Регистрация функции в системе доступна следующими способами:
Загрузка функции через интерфейс раздела "Функции".
Регистрация через код после старта системы.
@Component
public class RegisterImpl implements AfterPlatformStartup {
@Override
public void afterPlatformStartup() {
List<Class<?>> classes = List.of(
// Пользовательская функция обработки данных
EnrichCleanseFunction.class
);
try {
LibraryUtils.upsertAsLibrary(
classes,
Collections.emptyList(),
// Уникальное имя библиотеки, в которую будут добавлены классы функций
"sdk-cleanse-functions.jar",
// Версия
"1.0.0",
// Отображаемое на ui имя
"sdk");
} catch (Exception e) {
// Пользовательский вариант обработки
}
}
}
Реализация своей функции¶
Ниже приведен пример простой cleanse функции, которая принимает имя пользователя из INPUT_PORT_1 и выводит полное имя пользователя в OUTPUT_PORT_1.
В примере отображается:
Самостоятельная конфигурация (
configure() call
);Добавление полей (userService, module, two configuration variables);
Использование входных/выходных параметров
package tests;
import java.util.Objects;
import org.springframework.beans.factory.annotation.Autowired;
import org.unidata.mdm.core.dto.UserWithPasswordDTO;
import org.unidata.mdm.core.service.UserService;
import org.unidata.mdm.data.configuration.DataConfigurationConstants;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.annotation.ModuleRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;
import org.unidata.mdm.system.type.module.Module;
/**
* @author Mikhail Mikhailov on Feb 16, 2021
*/
public class TestCleanseFunction implements CleanseFunction {
@Autowired
private UserService userService;
@ModuleRef(DataModule.MODULE_ID)
private Module dataModule;
@ConfigurationRef(DataConfigurationConstants.PROPERTY_DATA_NODES)
private ConfigurationValue<String> nodes;
@ConfigurationRef(DataConfigurationConstants.PROPERTY_DATA_SHARDS)
private ConfigurationValue<Long> shards;
/**
* This function configuration.
*/
private static final CleanseFunctionConfiguration CONFIGURATION
= CleanseFunctionConfiguration.configuration()
.supports(CleanseFunctionExecutionScope.LOCAL)
.input(CleanseFunctionConfiguration.port()
.name(CleanseConstants.INPUT_PORT_1)
.displayName("User name")
.description("User name to resolve")
.filteringMode(CleanseFunctionPortFilteringMode.MODE_ONCE)
.inputTypes(CleanseFunctionPortInputType.SIMPLE)
.valueTypes(CleanseFunctionPortValueType.STRING)
.required(true)
.build())
.output(CleanseFunctionConfiguration.port()
.name(CleanseConstants.OUTPUT_PORT_1)
.displayName("Full name")
.description("Resolved full name or null")
.filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
.inputTypes(CleanseFunctionPortInputType.SIMPLE)
.valueTypes(CleanseFunctionPortValueType.STRING)
.required(true)
.build())
.build();
/**
* Constructor.
*/
public TestCleanseFunction() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public CleanseFunctionConfiguration configure() {
return CONFIGURATION;
}
/**
* {@inheritDoc}
*/
@Override
public CleanseFunctionResult execute(CleanseFunctionContext ctx) {
CleanseFunctionResult output = new CleanseFunctionResult();
CleanseFunctionInputParam param1 = ctx.getInputParam(CleanseConstants.INPUT_PORT_1);
String result = null;
if (param1 != null && !param1.isEmpty() && param1.isSingleton()) {
String input = param1.toSingletonValue();
UserWithPasswordDTO uwp = userService.getUserByName(input);
if (Objects.nonNull(uwp)) {
result = uwp.getFullName();
}
}
output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, result));
return output;
}
}
Пример пометки атрибута с ошибкой при валидации¶
Модуль: org.unidata.mdm.dq.core
Пример добавления атрибута с ошибкой в отчет
public class SomeValidationFunction extends AbstractBasicCleanseFunction {
/**
* Executes a cleanse function in the given context.
*/
@Override
public CleanseFunctionResult execute(CleanseFunctionContext ctx) {
//Создаем результат выполнения функции валидации
CleanseFunctionResult output = new CleanseFunctionResult();
//Достаем параметр из порта 1
CleanseFunctionInputParam param1 = ctx.getInputParam(CleanseConstants.INPUT_PORT_1);
//Тут, например, проверили, что есть такой параметр и туда пришел атрибут
if (param1 == null || param1.isEmpty()) {
//Такого порта нет или атрибут туда не пришел - отправили в результат false
output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.FALSE));
} else {
boolean[] isValid;
for (int i = 0; i < param1.getAttributes().size(); i++) {
//Достаем атрибут из порта
Attribute attribute = param1.getAttributes().get(i);
//Проверка...
//Если не прошла валидацию, то добавляем в отчет атрибут
if (!isValid[i]) {
output.addSpot(new DataQualitySpot(param1, attribute.toLocalPath(), attribute));//порт, путь к атрибуту, само значение атрибута
}
}
//Отправили в результат итог
output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, BooleanUtils.and(isValid)));
}
return output;
}
}
Результат выполнения функции валидации¶
org.unidata.mdm.dq.core.dto.CleanseFunctionResult
Указаны только сигнатуры методов, отвечающих за указание атрибута с ошибкой
/**
* Результат выполнения функции валидации
*/
public class CleanseFunctionResult {
/**
* Возвращает весь список путей к атрибутам, в которых случилась ошибка валидации
*
* @return failed paths collection
*/
public List<DataQualitySpot> getSpots();
/**
* Добавляет в список новое место с ошибкой валидации
*
* @param failure the failure to add
*/
public void addSpot(DataQualitySpot failure);
/**
* Добавляет список мест с ошибкой валидации к списку результата
*
* @param failures the spots to add
*/
public void addSpots(Collection<DataQualitySpot> failures);
/**
* Возвращает true, если результат содержит места с ошибкой валидации
*
* @return true, if this result contains failed validation paths.
*/
public boolean hasSpots();
}
Отметка об атрибуте с ошибкой¶
org.unidata.mdm.dq.core.type.io.DataQualitySpot
Указаны только методы для создания объекта ошибки в функции валидации
/**
* Место с проблемой/ошибкой - входящий атрибут не найден/отсутствует/некорректный/невалидный
*/
public class DataQualitySpot {
/**
* Входящий параметр функции валидации
*/
private final CleanseFunctionInputParam input;
/**
* Локальный путь к атрибуту в записи, в котором возникла ошибка функции валидации
*/
private final String path;
/**
* Значение атрибута, может быть null в случае отсутствия проверяемого атрибута
*/
private final Attribute attribute;
/**
* Для отображения неполных путей. Используется, когда нет атрибута
* Например, сюда можно комплексный атрибут, внутри которого произошла ошибка.
* Или саму запись, если нет более конкретного атрибута
*/
private final DataRecord container;
/**
* Constructor.
* @param path the path
*/
public DataQualitySpot(String path);
/**
* Constructor.
* @param input the input param
* @param path the path
* @param attribute the attribute
*/
public DataQualitySpot(CleanseFunctionInputParam input, String path, Attribute attribute);
/**
* Constructor.
* @param input the input param
* @param path the path
* @param container the container
*/
public DataQualitySpot(CleanseFunctionInputParam input, String path, DataRecord container);
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return new StringBuilder()
.append(input.getPortName())
.append(" > ")
.append(path)
.append(" = ")
.append(attribute)
.toString();
}
}