There are currently 2 AggregatingConsumeService implementations : aggregating-fs-consume-service and aggregating-jms-consume-service which work with the filesystem and a JMS Queue respectively. Both will perform aggregation based on a custom destination implementation which allows you to dynamically control where the messages are aggregated from.
Aggregator implementations
Aggregator Type | Description |
---|---|
mime-aggregator | Creates a new MIME part for each message that is aggregated; the original message is the first MIME Part |
ignore-original-mime-aggregator | As per mime-aggregator but ignores the original message, so the aggregated message only contains the results of processing |
replace-with-first-message-aggregator | Replaces the message payload with the first processed message. |
xml-document-aggregator | Merges each processed message into the original document. |
ignore-original-xml-document-aggregator | Merges each processed message into the configured template, ignoring the original document. |
zip-aggregator | Adds each processed message into a zip file; the filename associated with each message determined by metadata |
Example
We receive a message via JMS on SampleQ1
; the JMSMessageID for this message is used in some other messages (waiting on SampleQ2
) as the JMSCorrelationID. When we receive this message, we need to receive only those messages on SampleQ2 that have the corresponding JMSCorrelationID. Each of the messages received should be inserted into the original document and written back to JMSReplyTo
.
We can use a combination of aggregating-jms-consume-service and replace-metadata-value to handle this:
<standard-workflow>
<consumer class="jms-queue-consumer">
<destination class="configured-produce-destination">
<destination>SampleQ1</destination>
</destination>
<message-translator class="text-message-translator">
<move-jms-headers>true</move-jms-headers>
</message-translator>
</consumer>
<service-collection class="service-list">
<services>
<copy-metadata-service>
<metadata-keys>
<key-value-pair>
<key>JMSMessageID</key>
<value>filterSelectorKey</value>
</key-value-pair>
</metadata-keys>
</copy-metadata-service>
<replace-metadata-value>
<metadata-key-regexp>filterSelectorKey</metadata-key-regexp>
<search-value>(.*)</search-value>
<replacement-value>JMSCorrelationID = '$1'</replacement-value>
</replace-metadata-value>
<aggregating-jms-consume-service>
<connection class="shared-connection">
<lookup-name>sharedJMS</lookup-name>
</connection>
<jms-consumer class="aggregating-queue-consumer">
<destination class="consume-destination-from-metadata">
<default-destination>SampleQ2</default-destination>
<filter-metadata-key>filterSelectorKey</filter-metadata-key>
</destination>
<aggregator class="xml-document-aggregator">
<merge-implementation class="xml-insert-node">
<xpath-to-parent-node>/envelope/aggregated</xpath-to-parent-node>
</merge-implementation>
</aggregator>
</jms-consumer>
</aggregating-jms-consume-service>
</services>
</service-collection>
<producer class="jms-queue-producer">
<destination class="jms-reply-to-destination"/>
</producer>
</standard-workflow>
- We set
move-jms-headers=true
to capture JMSMessageID. - We create a valid filter expression and store it against the metadata key
filterSelectorKey
.
- Because
SampleQ2
is fixed; we just use adefault-destination=SampleQ2
. - We have not explicitly configured a timeout on aggregating-queue-consumer; the default is 30 seconds.
- If no message was received then an exception will be thrown in 30 seconds.
- In a worst case scenario we might wait for 60 seconds for this service to complete; we wait 29 seconds for the first message, and then another 30 seconds before the timeout is exceeded; and continuing.
- Each message that has the corresponding JMSCorrelationID is inserted as a new node under
/envelope/aggregated
.