Прием и обработка данных (ETL)
Назначение модуля
Модуль представляет собой гибкий и настраиваемый ETL-конвейер (Extract, Transform, Load — Извлечение, Преобразование, Загрузка). Его основная задача — принимать пакеты данных из внешних систем через брокер сообщений (Kafka), обрабатывать их и применять изменения в платформе.
Модуль работает по принципу управления через метаданные (meta-data-driven). Он не заточен под конкретную бизнес-логику, а выполняет операции, основываясь на типе входящего сообщения и его содержимом. Это изолированный и самостоятельный компонент, работа которого не влияет на другие сервисы.
Модуль охватывает все ключевые операции с записями:
Upsert: Создание или обновление записи.
Delete: Удаление записи. Поддерживается как мягкое (логическое), так и полное (физическое) удаление.
Restore: Восстановление ранее удаленной записи.
Внешние системы не просто передают данные в платформу, а полностью управляют состоянием записей.
Как это работает?
Используется набор специализированных процессоров и маршрутизатор, который автоматически направляет каждое сообщение в нужный обработчик в зависимости от типа операции.
Для конвертера используется класс InputMessageConverter, в котором сообщения типизируются и приводится к виду, соответствующему каждой операции.
InputMessageAggregator - агрегатор, который собирает несколько сообщений в одну пачку. InputMessageDynamicRouter - маршрутизатор, который, в зависимости от переданного типа операции, перенаправляет поток на нужный процессор.
Необходимо самостоятельно сконфигурировать маршрут в параметре org.unidata.mdm.system.messaging.domains.smart-etl-data-recipient-messaging "Настройка маршрутизации импорта данных".
Исходный маршрут с использованием новых процессоров для поддержки нескольких видов операций:
<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<route id="smart_etl_mdm_recipient_kafka_1">
<from uri="kafka:mdm.data.inbound?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<process ref="inputMessageConverter"/>
<aggregate aggregationStrategy="inputMessageAggregator"
completionSize="10"
completionTimeout="2000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<threads maxPoolSize="16"/>
<dynamicRouter>
<method ref="inputMessageDynamicRouter" method="route"/>
</dynamicRouter>
</aggregate>
</route>
<route id="processUpsert">
<from uri="direct:upsertProcess"/>
<process ref="dataRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processRestore">
<from uri="direct:restoreProcess"/>
<process ref="restoreRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processDelete">
<from uri="direct:deleteProcess"/>
<process ref="deleteRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="sendResult">
<from uri="direct:sendResult"/>
<simple>${body.records}</simple>
<process ref="toJsonMessageConverter"/>
<to uri="kafka:mdm.data.result-5?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<to uri="direct:aggregationComplete"/>
</route>
</routes>
В настройке агрегатора параметр completitionTimeout отвечает за время ожидания сообщений для отправки в миллисекундах.
Описание маршрутов:
smart_etl_mdm_recipient_kafka_1 - основной маршрут, в которым указаны url получения сообщений, конвертер, агрегатор и динамический маршрутизатор. Поля "ref" соответствуют именам классов.
Маршруты processUpsert, processRestore и processDelete - маршруты отдельных операций, во "from url" указаны строки, которые приходят из маршрутизатора. Далее поток направляется в маршрут отправки результатов.
sendResult - маршрут отправки результатов. В нем сообщения с ответами конвертируются в json и отправляются в топик "to url", и в конце перенаправляются на конец потока.
Примечание
В одной группе сообщений могут находиться сообщения только одного типа
Формат сообщений
Для всех сообщений было добавлено поле "operationType", принимаются 3 значения: "upsert", "delete", "restore". Формат написания не важен, регистр не учитывается. При некорректном значении возникнет ошибка, без указания типа и будет выполняться обработка по типу upsert.
Основа сообщения для upsert не изменилась. Пример:
{
"operationType": "upsert",
"record": {
"externalId": "test1",
"sourceSystem": "universe",
"attributes": {
"id": "test"
},
"entityName": "res"
}
}
Для сообщений с типом restore и delete для проведения операции необходимы только ключи записи, поэтому поле "record" заменяется на "keys" и принимает в себе поля externalId, sourceSystem, entityName и etalonId. Для delete добавляется поле "wipe", которое отвечает за физическое удаление записи (true - физическое удаление, false - логическое).
Пример для delete со связкой externalId+sourceSystem+entityName:
{
"operationType": "delete",
"wipe": "false",
"keys": {
"externalId": "test1",
"sourceSystem": "universe",
"entityName": "res"
}
}
Пример delete по etalonId:
{
"operationType": "delete",
"wipe": false,
"keys": {
"etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
"entityName": "res"
}
}
Пример для restore по связке externalId+sourceSystem+entityName:
{
"operationType": "restore",
"keys": {
"externalId": "test1",
"sourceSystem": "universe",
"entityName": "res"
}
}
Пример для restore по etalonId:
{
"operationType": "restore",
"keys": {
"etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
"entityName": "res"
}
}
Альтернативный вариант маршрутизации, без dynamicRouter при наличии ошибок:
<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<route id="smart_etl_mdm_recipient_kafka_1">
<from uri="kafka:mdm.data.inbound?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<process ref="inputMessageConverter"/>
<aggregate aggregationStrategy="inputMessageAggregator"
completionSize="10"
completionTimeout="2000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<threads maxPoolSize="16"/>
<choice>
<when>
<simple>${header.event} == 'DELETE'</simple>
<to uri="direct:deleteProcess"/>
</when>
<when>
<simple>${header.event} == 'RESTORE'</simple>
<to uri="direct:restoreProcess"/>
</when>
<otherwise>
<to uri="direct:upsertProcess"/>
</otherwise>
</choice>
</aggregate>
</route>
<route id="processUpsert">
<from uri="direct:upsertProcess"/>
<process ref="dataRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processRestore">
<from uri="direct:restoreProcess"/>
<process ref="restoreRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="processDelete">
<from uri="direct:deleteProcess"/>
<process ref="deleteRecordMessageProcessor"/>
<to uri="direct:sendResult"/>
</route>
<route id="sendResult">
<from uri="direct:sendResult"/>
<simple>${body.records}</simple>
<process ref="toJsonMessageConverter"/>
<to uri="kafka:mdm.data.result-5?brokers=kafka:9092&groupId=setl-consumer-group&max-request-size=20971520&fetch-max-bytes=20971520&max-partition-fetch-bytes=20971520"/>
<stop/>
</route>
</routes>