這段在介紹所有Spring Integration Flow能夠使用的components。
訊息傳遞的通道
過濾訊息,保留我們想留下來的訊息
@Filter(inputChannel="stringChannel",
outputChannel="containSChannel")
public boolean stringContainSFilter(String str) {
return str.contains("s");
}
或使用DSL
@Bean
public IntegrationFlow stringContainSFlow(String str) {
return IntegrationFlows
...
.<String>filter((s) -> s.contains("s"))
...
.get();
}
如同steam的map(Function<T,U> function),將傳入的T類別轉換為U類別
@Bean
@Transformer(inputChannel="numberChannel",
outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNumbers::toRoman;
}
DSL
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}
將訊息根據條件分派至不同的channels
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}
DSL
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
.subFlowMapping("EVEN", sf -> sf
.<Integer, Integer>transform(n -> n * 10)
.handle((i,h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf
.transform(RomanNumbers::toRoman)
.handle((i,h) -> { ... })
)
)
.get();
}
將一個訊息分拆成多個訊息後,分送至不同channel
public class OrderSplitter {
@Bean
@Splitter(inputChannel="poChannel",
outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
以上會將一個order分拆,並送至splitOrderChannel,而後可以再使用以下的Router,把分拆後的splitOrderChannel中的訊息route至其他的channel:
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(
List.class.getName(), "lineItemsChannel");
return router;
}
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
DSL
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle((billingInfo, h) -> {
...
}))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle((lineItem, h) -> {
...
}))
)
.get();
以上若覺得都擠成一團,也可如下分拆成個別方法再呼叫:
private String route(Object p) {
return p.getClass().isAssignableFrom(BillingInfo.class)
? "BILLING_INFO"
: "LINE_ITEMS";
}
private BillingInfo handleBillingInfo(
BillingInfo billingInfo, MessageHeaders h) {
// ...
}
private LineItem handleLineItems(
LineItem lineItem, MessageHeaders h) {
// ...
}
return IntegrationFlows
...
.split()
.route(
this::route,
mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle(this::handleBillingInfo))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle(this::handleLineItems)));
可以當訊息在channel中時,activate其他的流程
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}
@Bean
@ServiceActivator(inputChannel="orderChannel",
outputChannel="completeChannel")
public GenericHandler<EmailOrder> orderHandler(OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}
DSL
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}
App與Integration flow的介面:
package sia6;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}
DSL
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}
訊息在integration flow的進入點與出口點
@Bean
@InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}
DSL
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}
Spring提供了很多已經實作好的endpoint module可直接作為channel adapter使用:
@Bean
@InboundChannelAdapter(channel="file-channel",poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
DSL
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}
而Spring Integration有支援的endpoints可以在以下文件中查看:
[https://docs.spring.io/spring-integration/reference/html/endpoint-summary.html]