Прием и обработка данных (ETL)
Назначение модуля
Модуль представляет собой гибкий и настраиваемый ETL-конвейер (Extract, Transform, Load — Извлечение, Преобразование, Загрузка). Его основная задача — принимать пакеты данных из внешних систем через брокер сообщений (Kafka), обрабатывать их и применять изменения в платформе.
Модуль работает по принципу управления через метаданные (meta-data-driven). Он не заточен под конкретную бизнес-логику, а выполняет операции, основываясь на типе входящего сообщения и его содержимом. Это изолированный и самостоятельный компонент, работа которого не влияет на другие сервисы.
Модуль охватывает все ключевые операции с записями:
Upsert: Создание или обновление записи.
Delete: Удаление записи. Поддерживается как мягкое (логическое), так и полное (физическое) удаление.
Restore: Восстановление ранее удаленной записи.
Внешние системы не просто передают данные в платформу, а полностью управляют состоянием записей.
Как это работает?
Используется набор специализированных процессоров и маршрутизатор, который автоматически направляет каждое сообщение в нужный обработчик в зависимости от типа операции.
Для конвертера используется класс InputMessageConverter, в котором сообщения типизируются и приводится к виду, соответствующему каждой операции.
InputMessageAggregator - агрегатор, который собирает несколько сообщений в одну пачку. InputMessageDynamicRouter - маршрутизатор, который, в зависимости от переданного типа операции, перенаправляет поток на нужный процессор.
Необходимо самостоятельно сконфигурировать маршрут в параметре org.unidata.mdm.system.messaging.domains.smart-etl-data-recipient-messaging "Настройка маршрутизации импорта данных".
Исходный маршрут с использованием новых процессоров для поддержки нескольких видов операций:
<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<route id="smart_etl_mdm_recipient_kafka_1">
<from uri="kafka:mdm.data.inbound?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<process ref="inputMessageConverter"/>
<aggregate aggregationStrategy="inputMessageAggregator"
completionSize="10"
completionTimeout="2000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<threads maxPoolSize="16"/>
<dynamicRouter>
<method ref="inputMessageDynamicRouter" method="route"/>
</dynamicRouter>
</aggregate>
</route>
<route id="processUpsert">
<from uri="direct:upsertProcess"/>
<process ref="dataRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processRestore">
<from uri="direct:restoreProcess"/>
<process ref="restoreRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processDelete">
<from uri="direct:deleteProcess"/>
<process ref="deleteRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="sendResult">
<from uri="direct:sendResult"/>
<simple>${body.records}</simple>
<process ref="toJsonMessageConverter"/>
<to uri="kafka:mdm.data.result-5?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<to uri="direct:aggregationComplete"/>
</route>
</routes>
В настройке агрегатора параметр completitionTimeout отвечает за время ожидания сообщений для отправки в миллисекундах.
Описание маршрутов:
smart_etl_mdm_recipient_kafka_1 - основной маршрут, в которым указаны url получения сообщений, конвертер, агрегатор и динамический маршрутизатор. Поля "ref" соответствуют именам классов.
Маршруты processUpsert, processRestore и processDelete - маршруты отдельных операций, во "from url" указаны строки, которые приходят из маршрутизатора. Далее поток направляется в маршрут отправки результатов.
sendResult - маршрут отправки результатов. В нем сообщения с ответами конвертируются в json и отправляются в топик "to url", и в конце перенаправляются на конец потока.
Примечание
В одной группе сообщений могут находиться сообщения только одного типа
Формат сообщений
Для всех сообщений было добавлено поле "operationType", принимаются 3 значения: "upsert", "delete", "restore". Формат написания не важен, регистр не учитывается. При некорректном значении возникнет ошибка, без указания типа и будет выполняться обработка по типу upsert.
Основа сообщения для upsert не изменилась. Пример:
{
"record" : {
"externalId" : "e0f42564-afc7-4452-8030-bf322360c277", ##externail записи
"attributes" : { ##массив перечисления атрибутов записи
"code_ss" : "00-365860", ##строковый атрибут
"counterparty_type" : "1", ##числовой атрибут
"is_member_of_board_SD" : false, ##логический атрибут
"full_name" : "ООО \"ЛАРДИС\"", ##пример с использованием кавычек в строке
"mainContactPerson" : { ##тут уже идёт комплексный атрибут
"id" : "e08defbc-48ba-11ee-810f-005056b80223",
"code" : "00-041026",
"description" : "Иванов Иван Иванович"
},
"sourceSystem" : "bit", ##указание источника данных
},
"relations" : { ##описание связей и их атрибутов
"head_counterparty_relation" : [ {
"attributes" : {
"is_main_account" : "true"
},
"record" : {
"externalId" : "e0f42564-afc7-4452-8030-bf322360c299",
"sourceSystem" : "universe"
}
},{
"attributes" : {
"is_main_account" : "false"
},
"record" : {
"externalId" : "e0f42564-afc7-4452-8030-bf322360c288",
"sourceSystem" : "universe"
}
} ]
},
"patch" : "false", ##Существует возможность обновить запись частично, передав только те атрибуты, которые требуется изменить. Для этого необходимо в запросе использовать атрибут "patch" со значением "true". В таком случае обязательные атрибуты (согласно модели данных) можно не передавать.
"entityName" : "Counterparty", ##наименование реестра/справочника
"sourceSystem" : "universe" ##источник данных
}
}
Параметр patch позволяет выполнять частичное обновление записи. При значении true можно передавать только изменяемые атрибуты, без обязательных полей модели данных.
Для сообщений с типом restore и delete для проведения операции необходимы только ключи записи, поэтому поле "record" заменяется на "keys" и принимает в себе поля externalId, sourceSystem, entityName и etalonId. Для delete добавляется поле "wipe", которое отвечает за физическое удаление записи (true - физическое удаление, false - логическое).
Пример для delete со связкой externalId+sourceSystem+entityName:
{
"operationType": "delete",
"wipe": false,
"keys": {
"externalId": "test1",
"sourceSystem": "universe",
"entityName": "res"
}
}
Пример delete по etalonId:
{
"operationType": "delete",
"wipe": false,
"keys": {
"etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
"entityName": "res"
}
}
Пример для restore по связке externalId+sourceSystem+entityName:
{
"operationType": "restore",
"keys": {
"externalId": "test1",
"sourceSystem": "universe",
"entityName": "res"
}
}
Пример для restore по etalonId:
{
"operationType": "restore",
"keys": {
"etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
"entityName": "res"
}
}
Альтернативный вариант маршрутизации, без dynamicRouter при наличии ошибок:
<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<route id="smart_etl_mdm_recipient_kafka_1">
<from uri="kafka:mdm.data.inbound?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<process ref="inputMessageConverter"/>
<aggregate aggregationStrategy="inputMessageAggregator"
completionSize="10"
completionTimeout="2000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<threads maxPoolSize="16"/>
<choice>
<when>
<simple>${header.event} == 'DELETE'</simple>
<to uri="direct:deleteProcess"/>
</when>
<when>
<simple>${header.event} == 'RESTORE'</simple>
<to uri="direct:restoreProcess"/>
</when>
<otherwise>
<to uri="direct:upsertProcess"/>
</otherwise>
</choice>
</aggregate>
</route>
<route id="processUpsert">
<from uri="direct:upsertProcess"/>
<process ref="dataRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processRestore">
<from uri="direct:restoreProcess"/>
<process ref="restoreRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processDelete">
<from uri="direct:deleteProcess"/>
<process ref="deleteRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="sendResult">
<from uri="direct:sendResult"/>
<simple>${body.records}</simple>
<process ref="toJsonMessageConverter"/>
<to uri="kafka:mdm.data.result-5?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<stop/>
</route>
</routes>
Параметр dataQuality
Поле dataQuality содержит информацию о применённых правилах качества данных для записи.
Пример:
"dataQuality": [
{
"qualityRule": {
"name": "test_na_rule1",
"displayName": "test_na_rule1",
"severity": "GREEN"
},
"inputs": [
{
"attribute": "attr2",
"value": "null"
}
],
"message": "Поле пустое"
}
]
qualityRule — описание правила качества
severity — уровень критичности (например: GREEN, YELLOW, RED)
inputs — входные данные, на которых сработало правило
message — текст сообщения пользователю
Параметр details
Поле details используется для передачи информации об ошибках.
Обычно содержит текст ошибки, но в некоторых случаях может содержать описание структуры данных.
Пример:
"details": {
"type": "array",
"items": {
"type": "string"
}
}