Добавление пользовательского потребителя¶
Чтобы добавить пользовательские системы-потребители в начале необходимо сконфигурировать и поднять нужную очередь, а так же зарегистрировать в системе домен аналогично стандартному:
/**
* Default notification message.
*/
public static final MessageType DEFAULT_TYPE = new MessageType("record-no-change")
.withSubsystem(CoreSubsystems.DATA_SUBSYSTEM)
.withDisplayName(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.name"))
.withDescription(() -> TextUtils.getText(MODULE_ID + ".messaging.type.record-no-change.description"));
/**
* Default record notification messaging extension.
*/
public static final MessagingExtension DEFAULT_NOTIFICATIONS_MESSAGING =
new MessagingExtension(RECORD_MESSAGING_NAME, Objects.requireNonNull(ResourceUtils.asString("classpath:/routes/jms-notification.xml")))
.withDisplayName(() -> TextUtils.getText(MODULE_ID + ".module.name"))
.withDescription(() -> TextUtils.getText(MODULE_ID + ".module.description"))
.withMessageTypes(DEFAULT_TYPE);
MessageType должен быть сконфигурирован с subsystem "DATA_SUBSYSTEM":
<routes xmlns="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<!-- Route to send JSON to JMS -->
<route id="sendToJms">
<from uri="direct:jms_notification"/> <!-- Or any other entry point you want to use -->
<setHeader name="CamelHttpMethod">
<constant>POST</constant>
</setHeader>
<setHeader name="Content-Type">
<constant>application/json</constant>
</setHeader>
<to uri="jms:queue:defaultNotificationQueue"/>
</route>
</routes>
Пример конфигурации:
@Bean public CamelContext camelContext() throws Exception { CamelContext context = new DefaultCamelContext(); context.addComponent("jms", JmsComponent.jmsComponent(activeMQConnectionFactory())); context.addRoutes(new RouteBuilder() { @Override public void configure() { from("direct:jms_notification") // Or any other entry point you want to use .setHeader("CamelHttpMethod", constant("POST")) .setHeader("Content-Type", constant("application/json")) .to("jms:queue:defaultNotificationQueue"); } }); return context; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL("tcp://activemq:61616"); factory.setUserName("admin"); factory.setPassword("admin"); return factory; }
Docker-compose example
activemq: image: rmohr/activemq:latest restart: always ports:
"61616:61616"
"8161:8161"
- networks:
mdm_network
- environment:
ACTIVEMQ_ADMIN_LOGIN: ${ACTIVEMQ_ADMIN_LOGIN} ACTIVEMQ_ADMIN_PASSWORD: ${ACTIVEMQ_ADMIN_PASSWORD}
При необходимости расширить сообщение нужно использовать интерфейс NotificationPayloadAwareContext и внедриться в стандартный пайплайн отправки уведомления:
NotificationPayloadAwareContext
public interface NotificationPayloadAwareContext extends StorageCapableContext {
/**
* Access rights instance.
*/
StorageId SID_ACCESS_RIGHT = new StorageId("NOTIFICATION_PAYLOAD");
/**
* Gets payload.
*
* @return payload
*/
@Nullable
default <R extends Serializable> R notificationPayload() {
return getFromStorage(SID_ACCESS_RIGHT);
}
/**
* Sets payload for this context.
*
* @param payload the payload
*/
default void notificationPayload(Serializable payload) {
putToStorage(SID_ACCESS_RIGHT, payload);
}
}
Pipeline
{
"startId":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]",
"subjectId":"",
"description":"org.unidata.mdm.data.record.send.notification.get.start.description",
"segments":[
{
"segmentType":"START",
"id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_START]"
},
{
"segmentType":"POINT",
"id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_CONTINUE]"
},
{
"segmentType":"FINISH",
"id":"org.unidata.mdm.data[RECORD_SEND_NOTIFICATION_FINISH]"
}
]
}
Пример полученного сообщения:
{
"etalonKey": {
"id": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
"lsn": 1,
"status": "ACTIVE",
"operationType": "DIRECT",
"parents": [],
"children": [],
"parent": null,
"active": true
},
"originKey": {
"id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
"initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
"sourceSystem": "universe",
"enrichment": false,
"revision": 1,
"status": "ACTIVE",
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": null,
"createdBy": "admin",
"updatedBy": null,
"externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
"entityName": "test"
},
"supplementaryKeys": [
{
"id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
"initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
"sourceSystem": "universe",
"enrichment": false,
"revision": 1,
"status": "ACTIVE",
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": null,
"createdBy": "admin",
"updatedBy": null,
"externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
"entityName": "test"
}
],
"shard": 0,
"node": 0,
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": "2024-06-03T11:14:49.910+00:00",
"createdBy": "admin",
"updatedBy": "admin",
"published": true,
"entityName": "test",
"typeName": "test",
"active": true,
"new": false,
"boxKeys": [
"universe|7ae67ae0-219a-11ef-a359-4beff33798eb"
],
"lsnAsObject": {
"shard": 0,
"lsn": 1
},
"supplementaryKeysByBoxKey": {
"universe|7ae67ae0-219a-11ef-a359-4beff33798eb": {
"id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
"initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
"sourceSystem": "universe",
"enrichment": false,
"revision": 1,
"status": "ACTIVE",
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": null,
"createdBy": "admin",
"updatedBy": null,
"externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
"entityName": "test"
}
},
"supplementaryKeysWithoutEnrichments": [
{
"id": "7e47ce74-219a-11ef-8023-c9e7e1c21cff",
"initialOwner": "7d6a66c1-219a-11ef-8023-c9e7e1c21cff",
"sourceSystem": "universe",
"enrichment": false,
"revision": 1,
"status": "ACTIVE",
"createDate": "2024-06-03T11:14:49.909+00:00",
"updateDate": null,
"createdBy": "admin",
"updatedBy": null,
"externalId": "7ae67ae0-219a-11ef-a359-4beff33798eb",
"entityName": "test"
}
]
}
Система не производит дедупликацию, повторное сообщение будет доставлено в очередь.