Прием и обработка данных (ETL)

Назначение модуля

Модуль представляет собой гибкий и настраиваемый ETL-конвейер (Extract, Transform, Load — Извлечение, Преобразование, Загрузка). Его основная задача — принимать пакеты данных из внешних систем через брокер сообщений (Kafka), обрабатывать их и применять изменения в платформе.

Модуль работает по принципу управления через метаданные (meta-data-driven). Он не заточен под конкретную бизнес-логику, а выполняет операции, основываясь на типе входящего сообщения и его содержимом. Это изолированный и самостоятельный компонент, работа которого не влияет на другие сервисы.

Модуль охватывает все ключевые операции с записями:

  • Upsert: Создание или обновление записи.

  • Delete: Удаление записи. Поддерживается как мягкое (логическое), так и полное (физическое) удаление.

  • Restore: Восстановление ранее удаленной записи.

Внешние системы не просто передают данные в платформу, а полностью управляют состоянием записей.

Как это работает?

Используется набор специализированных процессоров и маршрутизатор, который автоматически направляет каждое сообщение в нужный обработчик в зависимости от типа операции.

Для конвертера используется класс InputMessageConverter, в котором сообщения типизируются и приводится к виду, соответствующему каждой операции.

InputMessageAggregator - агрегатор, который собирает несколько сообщений в одну пачку. InputMessageDynamicRouter - маршрутизатор, который, в зависимости от переданного типа операции, перенаправляет поток на нужный процессор.

Необходимо самостоятельно сконфигурировать маршрут в параметре org.unidata.mdm.system.messaging.domains.smart-etl-data-recipient-messaging "Настройка маршрутизации импорта данных".

Исходный маршрут с использованием новых процессоров для поддержки нескольких видов операций:

<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
 <route id="smart_etl_mdm_recipient_kafka_1">
    <from uri="kafka:mdm.data.inbound?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
    <process ref="inputMessageConverter"/>
    <aggregate aggregationStrategy="inputMessageAggregator"
           completionSize="10"
           completionTimeout="2000">
        <correlationExpression>
            <constant>true</constant>
        </correlationExpression>
        <threads maxPoolSize="16"/>
        <dynamicRouter>
            <method ref="inputMessageDynamicRouter" method="route"/>
        </dynamicRouter>
    </aggregate>
 </route>
 <route id="processUpsert">
    <from uri="direct:upsertProcess"/>
    <process ref="dataRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="processRestore">
    <from uri="direct:restoreProcess"/>
    <process ref="restoreRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="processDelete">
    <from uri="direct:deleteProcess"/>
    <process ref="deleteRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="sendResult">
    <from uri="direct:sendResult"/>
    <simple>${body.records}</simple>
    <process ref="toJsonMessageConverter"/>
    <to uri="kafka:mdm.data.result-5?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
    <to uri="direct:aggregationComplete"/>
 </route>
 </routes>

В настройке агрегатора параметр completitionTimeout отвечает за время ожидания сообщений для отправки в миллисекундах.

Описание маршрутов:

  • smart_etl_mdm_recipient_kafka_1 - основной маршрут, в которым указаны url получения сообщений, конвертер, агрегатор и динамический маршрутизатор. Поля "ref" соответствуют именам классов.

  • Маршруты processUpsert, processRestore и processDelete - маршруты отдельных операций, во "from url" указаны строки, которые приходят из маршрутизатора. Далее поток направляется в маршрут отправки результатов.

  • sendResult - маршрут отправки результатов. В нем сообщения с ответами конвертируются в json и отправляются в топик "to url", и в конце перенаправляются на конец потока.

Примечание

В одной группе сообщений могут находиться сообщения только одного типа

Формат сообщений

Для всех сообщений было добавлено поле "operationType", принимаются 3 значения: "upsert", "delete", "restore". Формат написания не важен, регистр не учитывается. При некорректном значении возникнет ошибка, без указания типа и будет выполняться обработка по типу upsert.

Основа сообщения для upsert не изменилась. Пример:

   {
 "operationType": "upsert",
 "record": {
   "externalId": "test1",
   "sourceSystem": "universe",
   "attributes": {
      "id": "test"
   },
   "entityName": "res"
 }
}

Для сообщений с типом restore и delete для проведения операции необходимы только ключи записи, поэтому поле "record" заменяется на "keys" и принимает в себе поля externalId, sourceSystem, entityName и etalonId. Для delete добавляется поле "wipe", которое отвечает за физическое удаление записи (true - физическое удаление, false - логическое).

Пример для delete со связкой externalId+sourceSystem+entityName:

   {
 "operationType": "delete",
 "wipe": "false",
 "keys": {
   "externalId": "test1",
   "sourceSystem": "universe",
   "entityName": "res"
 }
}

Пример delete по etalonId:

   {
 "operationType": "delete",
 "wipe": false,
 "keys": {
   "etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
   "entityName": "res"
 }
}

Пример для restore по связке externalId+sourceSystem+entityName:

   {
 "operationType": "restore",
 "keys": {
   "externalId": "test1",
   "sourceSystem": "universe",
   "entityName": "res"
 }
}

Пример для restore по etalonId:

   {
 "operationType": "restore",
 "keys": {
   "etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
   "entityName": "res"
 }
}

Альтернативный вариант маршрутизации, без dynamicRouter при наличии ошибок:

   <routes
   xmlns="http://camel.apache.org/schema/spring"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

   <route id="smart_etl_mdm_recipient_kafka_1">
       <from uri="kafka:mdm.data.inbound?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
       <process ref="inputMessageConverter"/>

       <aggregate aggregationStrategy="inputMessageAggregator"
                 completionSize="10"
                 completionTimeout="2000">
           <correlationExpression>
               <constant>true</constant>
           </correlationExpression>
           <threads maxPoolSize="16"/>

           <choice>
               <when>
                   <simple>${header.event} == 'DELETE'</simple>
                   <to uri="direct:deleteProcess"/>
               </when>
               <when>
                   <simple>${header.event} == 'RESTORE'</simple>
                   <to uri="direct:restoreProcess"/>
               </when>
               <otherwise>
                   <to uri="direct:upsertProcess"/>
               </otherwise>
           </choice>
       </aggregate>
   </route>

   <route id="processUpsert">
       <from uri="direct:upsertProcess"/>
       <process ref="dataRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="processRestore">
       <from uri="direct:restoreProcess"/>
       <process ref="restoreRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="processDelete">
       <from uri="direct:deleteProcess"/>
       <process ref="deleteRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="sendResult">
       <from uri="direct:sendResult"/>
       <simple>${body.records}</simple>
       <process ref="toJsonMessageConverter"/>
       <to uri="kafka:mdm.data.result-5?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
       <stop/>
   </route>
</routes>