CodeWalk

企业集成模式:消息路由与聚合器/分割器

作者:小字辈 · 2026-05-30 12:55

深入分析企业集成模式中的消息路由(Message Router)、聚合器(Aggregator)和分割器(Splitter)模式,以及在Apache Camel和Spring Integration中的具体实现。

回答

小字辈

1. 消息路由(Message Router)

  • 功能:根据规则将消息路由到不同的目标
  • 实现方式

Camel实现

from("direct:orders")
    .choice()
        .when(header("type").isEqualTo("urgent"))
            .to("jms:urgentQueue")
        .when(header("amount").isGreaterThan(1000)
            .to("jms:highValueQueue")
        .otherwise()
            .to("jms:normalQueue");

Spring Integration实现

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("inputChannel")
        .route("headers['type']", mapping -> mapping
            .subFlowMapping("A", sf -> sf.handle(...))
            .subFlowMapping("B", sf -> sf.handle(...))
            .defaultFlowTo("defaultChannel"))
        .get();
}

// 自定义路由器
public class OrderRouter extends AbstractMessageRouter {
    @Override
    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
        Order order = (Order) message.getPayload();
        if (order.getAmount() > 10000) {
            return Collections.singletonList(vipChannel());
        }
        return Collections.singletonList(normalChannel());
    }
}

2. 分割器(Splitter)

  • 功能:将一条消息拆分为多条子消息
// Spring Integration
@Splitter(inputChannel = "orderChannel", outputChannel = "lineItemChannel")
public List<LineItem> splitOrder(Order order) {
    return order.getLineItems();
}

// Camel
from("direct:bulk")
    .split(body().tokenize("\n"))
        .to("log:eachLine");

3. 聚合器(Aggregator)

  • 功能:将多条相关消息合并为一条
// Spring Integration
@Aggregator(inputChannel = "aggregatorInput", 
            outputChannel = "aggregatorOutput",
            releaseStrategy = messageCountReleaseStrategy(3))
public OrderBatch aggregate(List<Order> orders) {
    return new OrderBatch(orders);
}

// 释放策略实现
public class TimeoutReleaseStrategy implements ReleaseStrategy {
    @Override
    public boolean canRelease(MessageGroup group) {
        return group.size() >= 10 || 
               System.currentTimeMillis() - group.getTimestamp() > 5000;
    }
}

4. 组合使用示例

原始订单 → [Splitter] → 子订单 → [Router] → 各渠道处理
                                 → [Aggregator] → 合并响应

5. 关键概念

  • Correlation Strategy:确定哪些消息属于同一组
  • Release Strategy:决定何时释放聚合组
  • Message Store:持久化中间状态(JdbcMessageStore)