Добавление пользовательского потребителя

Чтобы добавить пользовательские системы-потребители в начале необходимо сконфигурировать и поднять нужную очередь, а так же зарегистрировать в системе домен аналогично стандартному:

/**
 * Default notification message.
 */
public static final MessageType DEFAULT_TYPE = new MessageType("record-no-change")
        .withSubsystem(CoreSubsystems.DATA_SUBSYSTEM)
        .withDisplayName(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.name"))
        .withDescription(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.description"));

/**
 * Default record notification messaging extension.
 */
public static final MessagingExtension DEFAULT_NOTIFICATIONS_MESSAGING =
        new MessagingExtension(RECORD_MESSAGING_NAME, Objects.requireNonNull(ResourceUtils.asString("classpath:/routes/jms-notification.xml")))
                .withDisplayName(() -> TextUtils.getText(MODULE_ID + ".module.name"))
                .withDescription(() -> TextUtils.getText(MODULE_ID + ".module.description"))
                .withMessageTypes(DEFAULT_TYPE);

MessageType должен быть сконфигурирован с subsystem "DATA_SUBSYSTEM":

   <routes xmlns="http://camel.apache.org/schema/spring"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

   <!-- Route to send JSON to JMS -->
   <route id="sendToJms">
       <from uri="direct:jms_notification"/> <!-- Or any other entry point you want to use -->
       <setHeader name="CamelHttpMethod">
           <constant>POST</constant>
       </setHeader>
       <setHeader name="Content-Type">
           <constant>application/json</constant>
       </setHeader>
       <to uri="jms:queue:defaultNotificationQueue"/>
   </route>

</routes>

Пример конфигурации:

  @Bean
 public CamelContext camelContext() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addComponent("jms", JmsComponent.jmsComponent(activeMQConnectionFactory()));
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:jms_notification") // Or any other entry point you want to use
                    .setHeader("CamelHttpMethod", constant("POST"))
                    .setHeader("Content-Type", constant("application/json"))
                    .to("jms:queue:defaultNotificationQueue");
        }
    });

    return context;
}

 @Bean
 public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://activemq:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");
    return factory;
 }
Docker-compose example

activemq: image: rmohr/activemq:latest restart: always ports:

  • "61616:61616"

  • "8161:8161"

networks:
  • mdm_network

environment:

ACTIVEMQ_ADMIN_LOGIN: ${ACTIVEMQ_ADMIN_LOGIN} ACTIVEMQ_ADMIN_PASSWORD: ${ACTIVEMQ_ADMIN_PASSWORD}

При необходимости расширить сообщение нужно использовать интерфейс NotificationPayloadAwareContext и внедриться в стандартный пайплайн отправки уведомления:

NotificationPayloadAwareContext
   public interface NotificationPayloadAwareContext extends StorageCapableContext {

/**
* Access rights instance.
*/
StorageId SID_ACCESS_RIGHT = new StorageId("NOTIFICATION_PAYLOAD");

/**
* Gets payload.
*
* @return payload
*/
@Nullable
default <R extends Serializable> R notificationPayload() {
   return getFromStorage(SID_ACCESS_RIGHT);
}

/**
* Sets payload for this context.
*
* @param payload the payload
*/
default void notificationPayload(Serializable payload) {
   putToStorage(SID_ACCESS_RIGHT, payload);
}

}
Pipeline
{
"startId":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]",
"subjectId":"",
"description":"org.unidata.mdm.data.record.send.notification.get.start.description",
"segments":[
{
 "segmentType":"START",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]"
},
{
 "segmentType":"POINT",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_CONTINUE]"
},
{
 "segmentType":"FINISH",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_FINISH]"
}
]

}

Пример полученного сообщения:

{
"etalonKey": {
   "id": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
   "lsn": 1,
   "status": "ACTIVE",
   "operationType": "DIRECT",
   "parents": [],
   "children": [],
   "parent": null,
   "active": true
},
"originKey": {
   "id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
   "initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
   "sourceSystem": "universe",
   "enrichment": false,
   "revision": 1,
   "status": "ACTIVE",
   "createDate": "2024-06-03T11:14:49.909+00:00",
   "updateDate": null,
   "createdBy": "admin",
   "updatedBy": null,
   "externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
   "entityName": "test"
},
"supplementaryKeys": [
   {
       "id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
       "initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
       "sourceSystem": "universe",
       "enrichment": false,
       "revision": 1,
       "status": "ACTIVE",
       "createDate": "2024-06-03T11:14:49.909+00:00",
       "updateDate": null,
       "createdBy": "admin",
       "updatedBy": null,
       "externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
       "entityName": "test"
   }
],
"shard": 0,
"node": 0,
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": "2024-06-03T11:14:49.910+00:00",
"createdBy": "admin",
"updatedBy": "admin",
"published": true,
"entityName": "test",
"typeName": "test",
"active": true,
"new": false,
"boxKeys": [
   "universe|7ae67ae0-219a-11ef-a359-4beff33798eb"
],
"lsnAsObject": {
   "shard": 0,
   "lsn": 1
},
"supplementaryKeysByBoxKey": {
   "universe|7ae67ae0-219a-11ef-a359-4beff33798eb": {
       "id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
       "initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
       "sourceSystem": "universe",
       "enrichment": false,
       "revision": 1,
       "status": "ACTIVE",
       "createDate": "2024-06-03T11:14:49.909+00:00",
       "updateDate": null,
       "createdBy": "admin",
       "updatedBy": null,
       "externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
       "entityName": "test"
   }
},
"supplementaryKeysWithoutEnrichments": [
   {
       "id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
       "initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
       "sourceSystem": "universe",
       "enrichment": false,
       "revision": 1,
       "status": "ACTIVE",
       "createDate": "2024-06-03T11:14:49.909+00:00",
       "updateDate": null,
       "createdBy": "admin",
       "updatedBy": null,
       "externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
       "entityName": "test"
   }
]

}

Система не производит дедупликацию, повторное сообщение будет доставлено в очередь.