Прием и обработка данных (ETL)

Назначение модуля

Модуль представляет собой гибкий и настраиваемый ETL-конвейер (Extract, Transform, Load — Извлечение, Преобразование, Загрузка). Его основная задача — принимать пакеты данных из внешних систем через брокер сообщений (Kafka), обрабатывать их и применять изменения в платформе.

Модуль работает по принципу управления через метаданные (meta-data-driven). Он не заточен под конкретную бизнес-логику, а выполняет операции, основываясь на типе входящего сообщения и его содержимом. Это изолированный и самостоятельный компонент, работа которого не влияет на другие сервисы.

Модуль охватывает все ключевые операции с записями:

  • Upsert: Создание или обновление записи.

  • Delete: Удаление записи. Поддерживается как мягкое (логическое), так и полное (физическое) удаление.

  • Restore: Восстановление ранее удаленной записи.

Внешние системы не просто передают данные в платформу, а полностью управляют состоянием записей.

Как это работает?

Используется набор специализированных процессоров и маршрутизатор, который автоматически направляет каждое сообщение в нужный обработчик в зависимости от типа операции.

Для конвертера используется класс InputMessageConverter, в котором сообщения типизируются и приводится к виду, соответствующему каждой операции.

InputMessageAggregator - агрегатор, который собирает несколько сообщений в одну пачку. InputMessageDynamicRouter - маршрутизатор, который, в зависимости от переданного типа операции, перенаправляет поток на нужный процессор.

Необходимо самостоятельно сконфигурировать маршрут в параметре org.unidata.mdm.system.messaging.domains.smart-etl-data-recipient-messaging "Настройка маршрутизации импорта данных".

Исходный маршрут с использованием новых процессоров для поддержки нескольких видов операций:

<routes
xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
 <route id="smart_etl_mdm_recipient_kafka_1">
    <from uri="kafka:mdm.data.inbound?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
    <process ref="inputMessageConverter"/>
    <aggregate aggregationStrategy="inputMessageAggregator"
           completionSize="10"
           completionTimeout="2000">
        <correlationExpression>
            <constant>true</constant>
        </correlationExpression>
        <threads maxPoolSize="16"/>
        <dynamicRouter>
            <method ref="inputMessageDynamicRouter" method="route"/>
        </dynamicRouter>
    </aggregate>
 </route>
 <route id="processUpsert">
    <from uri="direct:upsertProcess"/>
    <process ref="dataRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="processRestore">
    <from uri="direct:restoreProcess"/>
    <process ref="restoreRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="processDelete">
    <from uri="direct:deleteProcess"/>
    <process ref="deleteRecordMessageProcessor"/>
    <to uri="direct:sendResult"/>
 </route>
 <route id="sendResult">
    <from uri="direct:sendResult"/>
    <simple>${body.records}</simple>
    <process ref="toJsonMessageConverter"/>
    <to uri="kafka:mdm.data.result-5?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
    <to uri="direct:aggregationComplete"/>
 </route>
 </routes>

В настройке агрегатора параметр completitionTimeout отвечает за время ожидания сообщений для отправки в миллисекундах.

Описание маршрутов:

  • smart_etl_mdm_recipient_kafka_1 - основной маршрут, в которым указаны url получения сообщений, конвертер, агрегатор и динамический маршрутизатор. Поля "ref" соответствуют именам классов.

  • Маршруты processUpsert, processRestore и processDelete - маршруты отдельных операций, во "from url" указаны строки, которые приходят из маршрутизатора. Далее поток направляется в маршрут отправки результатов.

  • sendResult - маршрут отправки результатов. В нем сообщения с ответами конвертируются в json и отправляются в топик "to url", и в конце перенаправляются на конец потока.

Примечание

В одной группе сообщений могут находиться сообщения только одного типа

Формат сообщений

Для всех сообщений было добавлено поле "operationType", принимаются 3 значения: "upsert", "delete", "restore". Формат написания не важен, регистр не учитывается. При некорректном значении возникнет ошибка, без указания типа и будет выполняться обработка по типу upsert.

Основа сообщения для upsert не изменилась. Пример:

{
    "record" : {
    "externalId" : "e0f42564-afc7-4452-8030-bf322360c277", ##externail записи
    "attributes" : { ##массив перечисления атрибутов записи
        "code_ss" : "00-365860", ##строковый атрибут
        "counterparty_type" : "1", ##числовой атрибут
        "is_member_of_board_SD" : false, ##логический атрибут
        "full_name" : "ООО \"ЛАРДИС\"", ##пример с использованием кавычек в строке
        "mainContactPerson" : { ##тут уже идёт комплексный атрибут
        "id" : "e08defbc-48ba-11ee-810f-005056b80223",
        "code" : "00-041026",
        "description" : "Иванов Иван Иванович"
        },
        "sourceSystem" : "bit", ##указание источника данных
    },
    "relations" : { ##описание связей и их атрибутов
        "head_counterparty_relation" : [ {
        "attributes" : {
            "is_main_account" : "true"
        },
        "record" : {
            "externalId" : "e0f42564-afc7-4452-8030-bf322360c299",
            "sourceSystem" : "universe"
        }
        },{
            "attributes" : {
            "is_main_account" : "false"
            },
            "record" : {
            "externalId" : "e0f42564-afc7-4452-8030-bf322360c288",
            "sourceSystem" : "universe"
            }
        } ]
    },
    "patch" : "false",  ##Существует возможность обновить запись частично, передав только те атрибуты, которые требуется изменить. Для этого необходимо в запросе использовать атрибут "patch" со значением "true". В таком случае обязательные атрибуты (согласно модели данных) можно не передавать.
    "entityName" : "Counterparty", ##наименование реестра/справочника
    "sourceSystem" : "universe" ##источник данных
    }
}

Параметр patch позволяет выполнять частичное обновление записи. При значении true можно передавать только изменяемые атрибуты, без обязательных полей модели данных.

Для сообщений с типом restore и delete для проведения операции необходимы только ключи записи, поэтому поле "record" заменяется на "keys" и принимает в себе поля externalId, sourceSystem, entityName и etalonId. Для delete добавляется поле "wipe", которое отвечает за физическое удаление записи (true - физическое удаление, false - логическое).

Пример для delete со связкой externalId+sourceSystem+entityName:

   {
 "operationType": "delete",
 "wipe": false,
 "keys": {
   "externalId": "test1",
   "sourceSystem": "universe",
   "entityName": "res"
 }
}

Пример delete по etalonId:

   {
 "operationType": "delete",
 "wipe": false,
 "keys": {
   "etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
   "entityName": "res"
 }
}

Пример для restore по связке externalId+sourceSystem+entityName:

   {
 "operationType": "restore",
 "keys": {
   "externalId": "test1",
   "sourceSystem": "universe",
   "entityName": "res"
 }
}

Пример для restore по etalonId:

   {
 "operationType": "restore",
 "keys": {
   "etalonId": "b3b23964-694f-11f0-adf3-09e7afd64753",
   "entityName": "res"
 }
}

Альтернативный вариант маршрутизации, без dynamicRouter при наличии ошибок:

   <routes
   xmlns="http://camel.apache.org/schema/spring"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

   <route id="smart_etl_mdm_recipient_kafka_1">
       <from uri="kafka:mdm.data.inbound?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
       <process ref="inputMessageConverter"/>

       <aggregate aggregationStrategy="inputMessageAggregator"
                 completionSize="10"
                 completionTimeout="2000">
           <correlationExpression>
               <constant>true</constant>
           </correlationExpression>
           <threads maxPoolSize="16"/>

           <choice>
               <when>
                   <simple>${header.event} == 'DELETE'</simple>
                   <to uri="direct:deleteProcess"/>
               </when>
               <when>
                   <simple>${header.event} == 'RESTORE'</simple>
                   <to uri="direct:restoreProcess"/>
               </when>
               <otherwise>
                   <to uri="direct:upsertProcess"/>
               </otherwise>
           </choice>
       </aggregate>
   </route>

   <route id="processUpsert">
       <from uri="direct:upsertProcess"/>
       <process ref="dataRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="processRestore">
       <from uri="direct:restoreProcess"/>
       <process ref="restoreRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="processDelete">
       <from uri="direct:deleteProcess"/>
       <process ref="deleteRecordMessageProcessor"/>
       <to uri="direct:sendResult"/>
   </route>

   <route id="sendResult">
       <from uri="direct:sendResult"/>
       <simple>${body.records}</simple>
       <process ref="toJsonMessageConverter"/>
       <to uri="kafka:mdm.data.result-5?brokers=kafka:9092&amp;groupId=setl-consumer-group&amp;max-request-size=20971520&amp;fetch-max-bytes=20971520&amp;max-partition-fetch-bytes=20971520"/>
       <stop/>
   </route>
</routes>

Параметр dataQuality

Поле dataQuality содержит информацию о применённых правилах качества данных для записи.

Пример:

"dataQuality": [
    {
        "qualityRule": {
            "name": "test_na_rule1",
            "displayName": "test_na_rule1",
            "severity": "GREEN"
        },
        "inputs": [
            {
                "attribute": "attr2",
                "value": "null"
            }
        ],
        "message": "Поле пустое"
    }
]
  • qualityRule — описание правила качества

  • severity — уровень критичности (например: GREEN, YELLOW, RED)

  • inputs — входные данные, на которых сработало правило

  • message — текст сообщения пользователю

Параметр details

Поле details используется для передачи информации об ошибках.

Обычно содержит текст ошибки, но в некоторых случаях может содержать описание структуры данных.

Пример:

"details": {
    "type": "array",
    "items": {
        "type": "string"
    }
}