Пользовательские функции обработки данных

Статья описывает создание функций обработки данных.

Юниверс MDM поддерживают два вида функций: обогащения и валидации данных.

Зависимости

implementation "org.unidata.mdm:org.unidata.mdm.dq.core:${platformVersion}"

Необходимо указывать зависимости в имплементации модуля:

@Override
public Collection<Dependency> getDependencies() {
    return Arrays.asList(
        // system, core, etc
        new Dependency("org.unidata.mdm.dq.core", "6.0")
    );
}

Интерфейсы

Пользовательские функции должны реализовывать org.unidata.mdm.dq.core.type.cleanse.CleanseFunction.

/**
 * Returns CleanseFunction definition in terms of
 * - name
 * - display name
 * - description
 * - list of input ports
 * - list of output ports
 * Если функция возвращает конфигурацию, то она считается "автоконфигурированной" и
 * конфигурация для этого элемента недоступна в пользовательском интерфейсе (устанавливается только для чтения).
 *
 * @return configuration or null
 */
@Nullable
default CleanseFunctionConfiguration configure() {
    return null;
}
/**
 * Выполняет функцию очистки в заданном контексте.
 * @param ctx the context
 * @return the result or null
 */
@Nullable
default CleanseFunctionResult execute(CleanseFunctionContext ctx) {
    return null;
}
/**
 * @return true, если данная функция имеет значения по умолчанию (name, display name, description)
 */
default boolean hasDefaults() {
    return false;
}

Примечание

Реализация CleanseFunctionConfiguration configure() не обязательна, но если конфигурация переопределена, то параметры функции в пользовательском интерфейсе не редактируются.

CleanseFunctionConfiguration

Класс CleanseFunctionConfiguration описывает основные параметры конфигурации:

List<CleansePortConfiguration> input

входные порты

List<CleansePortConfiguration> output

выходные порты

Set<CleanseFunctionExecutionScope> supported

Режим обработки имеет два значения:
  • LOCAL - функция поддерживает выполнение над набором атрибутов из одной записи, соответствующей заданному выражению UPath.

  • GLOBAL - функция поддерживает выполнение над набором атрибутов, собранных рекурсивной фильтрацией по полному дереву в любой (под)записи, соответствующей заданному выражению UPath.

Совет

См. подробнее о режимах обработки функции

Внутренний класс CleansePortConfiguration содержит поля:

name

Системное имя порта

displayName

Отображаемое имя порта

description

Описание порта

filteringMode

Режим фильтрации значений org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode:
  • MODE_ALL - Все стандартные отфильтрованные значения.

  • MODE_ALL_WITH_INCOMPLETE - Все отфильтрованные значения и неполные пути.

  • MODE_ONCE - Первое значение, попавшее в выборку.

valueTypes

Тип атрибута, на которое назначается правило. org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType

required

Обязателен для заполнения

Если планируется работа с правилом обработки данных через UI, необходимо реализовать интерфейс org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionDefaults и обязательно переопределить CleanseFunction::hasDefaults() на return true;

Простой пример функции обогащения данных: добавляет "+3" к строковому атрибуту, если значение атрибута не пустое
public class EnrichCleanseFunction implements CleanseFunction, CleanseFunctionDefaults  {
    private static final String INPUT_PORT_1_NAME = SDKSpringModule.MODULE_ID + ".cleanse.function.enrich.input.port1" +
        ".name";
private static final String INPUT_PORT_1_DESCRIPTION = SDKSpringModule.MODULE_ID
            + ".cleanse.function.enrich.input.port1.description";
private static final String OUTPUT_PORT_1_NAME = SDKSpringModule.MODULE_ID
            + ".cleanse.function.enrich.output.port1.name";
private static final String OUTPUT_PORT_1_DESCRIPTION = SDKSpringModule.MODULE_ID
            + ".cleanse.function.enrich.output.port1.description";

    @Override
    public CleanseFunctionConfiguration configure() {
        return CleanseFunctionConfiguration.configuration()
                .supports(CleanseFunctionExecutionScope.LOCAL, CleanseFunctionExecutionScope.GLOBAL)
                .input(
                    // Конфигурация входного порта
                    CleanseFunctionConfiguration.port()
                        .name(INPUT_PORT_1)
                        .displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
                        .description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
                        .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                        .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                        .valueTypes(CleanseFunctionPortValueType.STRING)
                        .required(true)
                        .build())
                .output(CleanseFunctionConfiguration.port()
                        .name(OUTPUT_PORT_1)
                        .displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
                        .description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
                        .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                        .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                        .valueTypes(CleanseFunctionPortValueType.STRING)
                        .required(true)
                        .build())
                .build();
    }

    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        CleanseFunctionInputParam stringParam = ctx.getInputParam(INPUT_PORT_1);

        CleanseFunctionResult result = new CleanseFunctionResult();

        if (Objects.isNull(stringParam)) {
            return result;
        }

        String strValue = stringParam.toSingletonValue();

        if (strValue == null || strValue.isEmpty()) {
            return result;
        }

        result.putOutputParam(CleanseFunctionOutputParam.of(OUTPUT_PORT_1, strValue + "+3"));

        return result;
    }

    @Override
    default boolean hasDefaults() {
        return true;
    }

Примечание

Некоторые функции обработки данных в системе наследуют абстрактный класс org.unidata.mdm.dq.core.type.function.AbstractBasicCleanseFunction. Он предназначен для системных функций, недоступен для редактирования, обратная совместимость не гарантируется.

CleanseFunctionResult

Метод CleanseFunction::execute возвращает экземпляр объекта com.universe.mdm.sdk.spring.service.impl.function.CleanseFunctionResult. Ошибки валидации передаются в метод CleanseFunction::addSpot. Результат обогащения передается в выходные порты CleanseFunction::putOutputParam

Регистрация в системе

Регистрация функции в системе доступна следующими способами:

  1. Загрузка функции через интерфейс раздела "Функции".

  2. Регистрация через код после старта системы.

Регистрация функции через код
@Component
public class RegisterImpl implements AfterPlatformStartup {

    @Override
    public void afterPlatformStartup() {

        List<Class<?>> classes = List.of(
            // Пользовательская функция обработки данных
            EnrichCleanseFunction.class
        );

        try {
            LibraryUtils.upsertAsLibrary(
                    classes,
                    Collections.emptyList(),
                    // Уникальное имя библиотеки, в которую будут добавлены классы функций
                    "sdk-cleanse-functions.jar",
                    // Версия
                    "1.0.0",
                    // Отображаемое на ui имя
                    "sdk");

        } catch (Exception e) {
            // Пользовательский вариант обработки
        }
    }
}

Реализация своей функции

Ниже приведен пример простой cleanse функции, которая принимает имя пользователя из INPUT_PORT_1 и выводит полное имя пользователя в OUTPUT_PORT_1.

В примере отображается:

  • Самостоятельная конфигурация (configure() call);

  • Добавление полей (userService, module, two configuration variables);

  • Использование входных/выходных параметров

package tests;

import java.util.Objects;

import org.springframework.beans.factory.annotation.Autowired;
import org.unidata.mdm.core.dto.UserWithPasswordDTO;
import org.unidata.mdm.core.service.UserService;
import org.unidata.mdm.data.configuration.DataConfigurationConstants;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.annotation.ModuleRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;
import org.unidata.mdm.system.type.module.Module;

/**
* @author Mikhail Mikhailov on Feb 16, 2021
*/
public class TestCleanseFunction implements CleanseFunction {

    @Autowired
    private UserService userService;

    @ModuleRef(DataModule.MODULE_ID)
    private Module dataModule;

    @ConfigurationRef(DataConfigurationConstants.PROPERTY_DATA_NODES)
    private ConfigurationValue<String> nodes;

    @ConfigurationRef(DataConfigurationConstants.PROPERTY_DATA_SHARDS)
    private ConfigurationValue<Long> shards;

    /**
    * This function configuration.
    */
    private static final CleanseFunctionConfiguration CONFIGURATION
        = CleanseFunctionConfiguration.configuration()
            .supports(CleanseFunctionExecutionScope.LOCAL)
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_1)
                    .displayName("User name")
                    .description("User name to resolve")
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ONCE)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_1)
                    .displayName("Full name")
                    .description("Resolved full name or null")
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .build();

    /**
    * Constructor.
    */
    public TestCleanseFunction() {
        super();
    }

    /**
    * {@inheritDoc}
    */
    @Override
    public CleanseFunctionConfiguration configure() {
        return CONFIGURATION;
    }

    /**
    * {@inheritDoc}
    */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        CleanseFunctionResult output = new CleanseFunctionResult();
        CleanseFunctionInputParam param1 = ctx.getInputParam(CleanseConstants.INPUT_PORT_1);

        String result = null;
        if (param1 != null && !param1.isEmpty() && param1.isSingleton()) {

            String input = param1.toSingletonValue();
            UserWithPasswordDTO uwp = userService.getUserByName(input);
            if (Objects.nonNull(uwp)) {
                result = uwp.getFullName();
            }
        }

        output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, result));

        return output;
    }
}

Пример пометки атрибута с ошибкой при валидации

Модуль: org.unidata.mdm.dq.core

Пример добавления атрибута с ошибкой в отчет

public class SomeValidationFunction extends AbstractBasicCleanseFunction {
    /**
    * Executes a cleanse function in the given context.
    */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        //Создаем результат выполнения функции валидации
        CleanseFunctionResult output = new CleanseFunctionResult();

        //Достаем параметр из порта 1
        CleanseFunctionInputParam param1 = ctx.getInputParam(CleanseConstants.INPUT_PORT_1);

        //Тут, например, проверили, что есть такой параметр и туда пришел атрибут
        if (param1 == null || param1.isEmpty()) {
            //Такого порта нет или атрибут туда не пришел - отправили в результат false
            output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.FALSE));
        } else {

            boolean[] isValid;
            for (int i = 0; i < param1.getAttributes().size(); i++) {

                //Достаем атрибут из порта
                Attribute attribute = param1.getAttributes().get(i);

                //Проверка...

                //Если не прошла валидацию, то добавляем в отчет атрибут
                if (!isValid[i]) {
                    output.addSpot(new DataQualitySpot(param1, attribute.toLocalPath(), attribute));//порт, путь к атрибуту, само значение атрибута
                }
            }

            //Отправили в результат итог
            output.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, BooleanUtils.and(isValid)));
        }

        return output;
    }
}

Результат выполнения функции валидации

org.unidata.mdm.dq.core.dto.CleanseFunctionResult

Указаны только сигнатуры методов, отвечающих за указание атрибута с ошибкой

/**
  * Результат выполнения функции валидации
  */
  public class CleanseFunctionResult {
      /**
      * Возвращает весь список путей к атрибутам, в которых случилась ошибка валидации
      *
      * @return failed paths collection
      */
      public List<DataQualitySpot> getSpots();
      /**
      * Добавляет в список новое место с ошибкой валидации
      *
      * @param failure the failure to add
      */
      public void addSpot(DataQualitySpot failure);
      /**
      * Добавляет список мест с ошибкой валидации к списку результата
      *
      * @param failures the spots to add
      */
      public void addSpots(Collection<DataQualitySpot> failures);
      /**
      * Возвращает true, если результат содержит места с ошибкой валидации
      *
      * @return true, if this result contains failed validation paths.
      */
      public boolean hasSpots();
}

Отметка об атрибуте с ошибкой

org.unidata.mdm.dq.core.type.io.DataQualitySpot

Указаны только методы для создания объекта ошибки в функции валидации

/**
  * Место с проблемой/ошибкой - входящий атрибут не найден/отсутствует/некорректный/невалидный
  */
  public class DataQualitySpot {
      /**
      * Входящий параметр функции валидации
      */
      private final CleanseFunctionInputParam input;
      /**
      * Локальный путь к атрибуту в записи, в котором возникла ошибка функции валидации
      */
      private final String path;
      /**
      * Значение атрибута, может быть null в случае отсутствия проверяемого атрибута
      */
      private final Attribute attribute;
      /**
      * Для отображения неполных путей. Используется, когда нет атрибута
      * Например, сюда можно комплексный атрибут, внутри которого произошла ошибка.
      * Или саму запись, если нет более конкретного атрибута
      */
      private final DataRecord container;
      /**
      * Constructor.
      * @param path the path
      */
      public DataQualitySpot(String path);
      /**
      * Constructor.
      * @param input the input param
      * @param path the path
      * @param attribute the attribute
      */
      public DataQualitySpot(CleanseFunctionInputParam input, String path, Attribute attribute);
      /**
      * Constructor.
      * @param input the input param
      * @param path the path
      * @param container the container
      */
      public DataQualitySpot(CleanseFunctionInputParam input, String path, DataRecord container);
      /**
      * {@inheritDoc}
      */
      @Override
      public String toString() {
          return new StringBuilder()
                  .append(input.getPortName())
                  .append(" > ")
                  .append(path)
                  .append(" = ")
                  .append(attribute)
                  .toString();
      }
}