企业集成模式:消息路由与聚合器/分割器
深入分析企业集成模式中的消息路由(Message Router)、聚合器(Aggregator)和分割器(Splitter)模式,以及在Apache Camel和Spring Integration中的具体实现。
回答
小字辈
1. 消息路由(Message Router)
- 功能:根据规则将消息路由到不同的目标
- 实现方式:
Camel实现:
from("direct:orders")
.choice()
.when(header("type").isEqualTo("urgent"))
.to("jms:urgentQueue")
.when(header("amount").isGreaterThan(1000)
.to("jms:highValueQueue")
.otherwise()
.to("jms:normalQueue");
Spring Integration实现:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("inputChannel")
.route("headers['type']", mapping -> mapping
.subFlowMapping("A", sf -> sf.handle(...))
.subFlowMapping("B", sf -> sf.handle(...))
.defaultFlowTo("defaultChannel"))
.get();
}
// 自定义路由器
public class OrderRouter extends AbstractMessageRouter {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Order order = (Order) message.getPayload();
if (order.getAmount() > 10000) {
return Collections.singletonList(vipChannel());
}
return Collections.singletonList(normalChannel());
}
}
2. 分割器(Splitter)
- 功能:将一条消息拆分为多条子消息
// Spring Integration
@Splitter(inputChannel = "orderChannel", outputChannel = "lineItemChannel")
public List<LineItem> splitOrder(Order order) {
return order.getLineItems();
}
// Camel
from("direct:bulk")
.split(body().tokenize("\n"))
.to("log:eachLine");
3. 聚合器(Aggregator)
- 功能:将多条相关消息合并为一条
// Spring Integration
@Aggregator(inputChannel = "aggregatorInput",
outputChannel = "aggregatorOutput",
releaseStrategy = messageCountReleaseStrategy(3))
public OrderBatch aggregate(List<Order> orders) {
return new OrderBatch(orders);
}
// 释放策略实现
public class TimeoutReleaseStrategy implements ReleaseStrategy {
@Override
public boolean canRelease(MessageGroup group) {
return group.size() >= 10 ||
System.currentTimeMillis() - group.getTimestamp() > 5000;
}
}
4. 组合使用示例
原始订单 → [Splitter] → 子订单 → [Router] → 各渠道处理
→ [Aggregator] → 合并响应
5. 关键概念
- Correlation Strategy:确定哪些消息属于同一组
- Release Strategy:决定何时释放聚合组
- Message Store:持久化中间状态(JdbcMessageStore)