Добавление пользовательского потребителя

Предупреждение

Это демо-функциональность, для получения запрашивайте специальный дистрибутив

Чтобы добавить пользовательские системы-потребители в начале необходимо сконфигурировать и поднять нужную очередь, а так же зарегистрировать в системе домен аналогично стандартному:

/**
 * Default notification message.
 */
public static final MessageType DEFAULT_TYPE = new MessageType("record-no-change")
        .withSubsystem(CoreSubsystems.DATA_SUBSYSTEM)
        .withDisplayName(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.name"))
        .withDescription(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.description"));

/**
 * Default record notification messaging extension.
 */
public static final MessagingExtension DEFAULT_NOTIFICATIONS_MESSAGING =
        new MessagingExtension(RECORD_MESSAGING_NAME, Objects.requireNonNull(ResourceUtils.asString("classpath:/routes/jms-notification.xml")))
                .withDisplayName(() -> TextUtils.getText(MODULE_ID + ".module.name"))
                .withDescription(() -> TextUtils.getText(MODULE_ID + ".module.description"))
                .withMessageTypes(DEFAULT_TYPE);

MessageType должен быть сконфигурирован с subsystem "DATA_SUBSYSTEM":

   <routes xmlns="http://camel.apache.org/schema/spring"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

   <!-- Route to send JSON to JMS -->
   <route id="sendToJms">
       <from uri="direct:jms_notification"/> <!-- Or any other entry point you want to use -->
       <setHeader name="CamelHttpMethod">
           <constant>POST</constant>
       </setHeader>
       <setHeader name="Content-Type">
           <constant>application/json</constant>
       </setHeader>
       <to uri="jms:queue:defaultNotificationQueue"/>
   </route>

</routes>

Пример конфигурации:

  @Bean
 public CamelContext camelContext() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addComponent("jms", JmsComponent.jmsComponent(activeMQConnectionFactory()));
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:jms_notification") // Or any other entry point you want to use
                    .setHeader("CamelHttpMethod", constant("POST"))
                    .setHeader("Content-Type", constant("application/json"))
                    .to("jms:queue:defaultNotificationQueue");
        }
    });

    return context;
}

 @Bean
 public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://activemq:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");
    return factory;
 }
Docker-compose example

activemq: image: rmohr/activemq:latest restart: always ports:

  • "61616:61616"

  • "8161:8161"

networks:
  • mdm_network

environment:

ACTIVEMQ_ADMIN_LOGIN: ${ACTIVEMQ_ADMIN_LOGIN} ACTIVEMQ_ADMIN_PASSWORD: ${ACTIVEMQ_ADMIN_PASSWORD}

При необходимости расширить сообщение нужно использовать интерфейс NotificationPayloadAwareContext и внедриться в стандартный пайплайн отправки уведомления:

NotificationPayloadAwareContext
   public interface NotificationPayloadAwareContext extends StorageCapableContext {

/**
* Access rights instance.
*/
StorageId SID_ACCESS_RIGHT = new StorageId("NOTIFICATION_PAYLOAD");

/**
* Gets payload.
*
* @return payload
*/
@Nullable
default <R extends Serializable> R notificationPayload() {
   return getFromStorage(SID_ACCESS_RIGHT);
}

/**
* Sets payload for this context.
*
* @param payload the payload
*/
default void notificationPayload(Serializable payload) {
   putToStorage(SID_ACCESS_RIGHT, payload);
}

}

Пайплайн отправки уведомления сторонним системам-потребителям

В enterprise-pipelines.json добавлен пайплайн отправки нотификаций всем кастомным системам-потребителям.

Pipeline
{
"startId":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]",
"subjectId":"",
"description":"org.unidata.mdm.data.record.send.notification.get.start.description",
"segments":[
{
 "segmentType":"START",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]"
},
{
 "segmentType":"POINT",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_CONTINUE]"
},
{
 "segmentType":"FINISH",
 "id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_FINISH]"
}
]

}

При необходимости расширить сообщение необходимо использовать интерфейс NotificationPayloadAwareContext и внедриться в дефолтный пайплайн отправки нотификации:

   public interface NotificationPayloadAwareContext extends StorageCapableContext {

   /**
    * Access rights instance.
    */
   StorageId SID_ACCESS_RIGHT = new StorageId("NOTIFICATION_PAYLOAD");

   /**
    * Gets payload.
    *
    * @return payload
    */
   @Nullable
   default <R extends Serializable> R notificationPayload() {
       return getFromStorage(SID_ACCESS_RIGHT);
   }

   /**
    * Sets payload for this context.
    *
    * @param payload the payload
    */
   default void notificationPayload(Serializable payload) {
       putToStorage(SID_ACCESS_RIGHT, payload);
   }
}

Тестовый ActiveMQ

В проект добавлен опциональный docker-compose: unidata-deploy/options/docker-compose.activemq.yml. Его необходимо подключать в Run configurations - Compose file после основного docker-compose файла.

activemq:
image: rmohr/activemq:latest
restart: always
ports:
  - "61616:61616"
  - "8161:8161"
networks:
  - mdm_network
environment:
  ACTIVEMQ_ADMIN_LOGIN: ${ACTIVEMQ_ADMIN_LOGIN}
  ACTIVEMQ_ADMIN_PASSWORD: ${ACTIVEMQ_ADMIN_PASSWORD}