本文聚焦「推送內容」本身:資料從哪來、怎麼整理、推到哪個 topic、payload 長什麼樣。
推送面向與主題一覽
市場摘要:/topic/market(last、24h 高/低/量、最佳買賣…)
訂單簿快照:/topic/orderbook(聚合深度)
逐筆成交:/topic/trades(撮合事件觸發即推)
最近成交(批次):/topic/trades/recent(補齊歷史 N 筆)
用 SimpMessagingTemplate 發送;資料來源:撮合事件 + Feign 請求撮合引擎回傳最新訂單簿資訊
@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataService {
private final SimpMessagingTemplate messagingTemplate;
private final MathedOrderRepository matchedOrderRepository; // 成交紀錄
private final EapMatchEngine eapMatchEngine; // Feign:撈訂單簿/統計
/**
* 推送市場數據統計
*/
public void pushMarketData() {
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
List<MatchOrderEntity> recentTrades = matchedOrderRepository.findAll()
.stream()
.filter(trade -> trade.getUpdateTime().isAfter(yesterday))
.sorted((a, b) -> b.getUpdateTime().compareTo(a.getUpdateTime()))
.toList();
if (recentTrades.isEmpty()) {
log.info("沒有最近的成交記錄,跳過市場數據推送");
return;
}
// 計算市場統計數據
Integer lastPrice = recentTrades.get(0).getPrice();
Integer highPrice =
recentTrades.stream().mapToInt(MatchOrderEntity::getPrice).max().orElse(0);
Integer lowPrice =
recentTrades.stream().mapToInt(MatchOrderEntity::getPrice).min().orElse(0);
Long volume =
recentTrades.stream().mapToLong(MatchOrderEntity::getAmount).sum();
// 計算價格變化 (假設第一筆為昨日收盤價)
Double priceChange = 0.0;
Double priceChangePercent = 0.0;
if (recentTrades.size() > 1) {
Integer firstPrice = recentTrades.get(recentTrades.size() -
1).getPrice();
priceChange = (double) (lastPrice - firstPrice);
priceChangePercent = (priceChange / firstPrice) * 100;
}
MarketDataDto marketData = MarketDataDto.builder()
.lastPrice(lastPrice)
.highPrice(highPrice)
.lowPrice(lowPrice)
.volume(volume)
.priceChange(priceChange)
.priceChangePercent(priceChangePercent)
.timestamp(LocalDateTime.now())
.marketStatus("OPEN")
.build();
// 推送到所有訂閱市場數據的客戶端
messagingTemplate.convertAndSend("/topic/market", marketData);
log.info("推送市場數據: 最新價格={}, 24h成交量={}", lastPrice, volume);
}
/** 訂單簿快照 → /topic/orderbook */
public void pushOrderBook() {
OrderBookResponseDto resp = eapMatchEngine.getOrderBook(10).getBody();
// 映射為前端需要的DTO(例:[{price, qty}] for bids/asks)
messagingTemplate.convertAndSend("/topic/orderbook", orderBook);
}
/** 逐筆成交(撮合事件觸發) → /topic/trades */
public void pushRealtimeTrade(MatchOrderEntity matchOrder) {
RealtimeTradeDto tradeDto = RealtimeTradeDto.builder()
.tradeId((long) matchOrder.getId())
.price(matchOrder.getPrice())
.amount(matchOrder.getAmount())
.tradeTime(matchOrder.getUpdateTime())
.tradeType(matchOrder.getOrderType()) // BUY/SELL
.build();
messagingTemplate.convertAndSend("/topic/trades", tradeDto);
}
/**
* 推送最近的成交記錄給新訂閱的客戶端
* 當客戶端首次訂閱 /topic/trades 時調用
*/
public void pushRecentTrades() {
try {
// 獲取最近10筆成交記錄
List<MatchOrderEntity> recentTrades = matchedOrderRepository.findAll()
.stream()
.sorted((a, b) ->b.getUpdateTime().compareTo(a.getUpdateTime()))
.limit(10)
.toList();
if (recentTrades.isEmpty()) {
log.info("沒有最近的成交記錄");
return;
}
// 轉換為 DTO
List<RealtimeTradeDto> tradeDtos = recentTrades.stream()
.map(trade -> RealtimeTradeDto.builder()
.tradeId((long) trade.getId())
.price(trade.getPrice())
.amount(trade.getAmount())
.tradeTime(trade.getUpdateTime())
.tradeType(trade.getOrderType())
.build())
.toList();
// 建立批量推送 DTO
RecentTradesDto recentTradesDto = RecentTradesDto.builder()
.trades(tradeDtos)
.total(tradeDtos.size())
.timestamp(System.currentTimeMillis())
.build();
// 推送最近的成交記錄列表
messagingTemplate.convertAndSend("/topic/trades/recent",
recentTradesDto);
log.info("推送最近 {} 筆成交記錄給新訂閱客戶端", tradeDtos.size());
} catch (Exception e) {
log.error("推送最近成交記錄失敗: {}", e.getMessage());
}
}
}
Payload 一覽 我回傳給前端的內容設計
/topic/market
{
"symbol": "EAP-BASE",
"lastPrice": 1024,
"change": 0.012, // 1.2%
"high24h": 1050,
"low24h": 990,
"volume24h": 187532,
"bestBid": 1023,
"bestAsk": 1025,
"ts": 1738288888123
}
/topic/orderbook
{
"bids": [[1023, 17], [1022, 22], [1021, 18]],
"asks": [[1025, 10], [1026, 18], [1027, 30]],
"depth": 10,
"ts": 1738288888123
}
/topic/trades(逐筆)
{
"tradeId": 8811223377,
"price": 1024,
"amount": 5,
"tradeType": "BUY",
"tradeTime": "2025-09-30T09:21:33.123+08:00"
}
/topic/trades/recent(批次)
{
"items": [
{"tradeId": 8811223370, "price": 1023, "amount": 2, "tradeType": "SELL", "tradeTime": "..."},
{"tradeId": 8811223371, "price": 1024, "amount": 3, "tradeType": "BUY", "tradeTime": "..."}
],
"ts": 1738288888123
}
監聽 order.matched,保存成交、立刻推 /topic/trades,並同步刷新市場摘要,邏輯上來說每當有訂單成交那市場就會有所變動,考慮到電力市場交易頻率並不頻繁,市場任一筆交易都會影響使用者對於市場情形評估所以我設計在每當有一筆新成交就會推送新市場資訊給使用者。
@Component
@RequiredArgsConstructor
public class MatchEventListener {
private final MathedOrderRepository matchOrderRepository;
private final MarketDataService marketDataService;
@RabbitListener(queues = ORDER_MATCHED_QUEUE)
public void onOrderMatched(OrderMatchedEvent event) {
// 轉為 Entity 並保存
MatchOrderEntity savedOrder = matchOrderRepository.save(/* map from event */);
// 即時推逐筆成交 + 刷新市場摘要(前端看得到最新 last/24h 指標)
marketDataService.pushRealtimeTrade(savedOrder);
marketDataService.pushMarketData();
}
}
撈訂單簿 / 市場統計,供 MarketDataService 使用
@FeignClient(name = "eap-matchEngine", url = "${eap.matchEngine.base-url}")
public interface EapMatchEngine {
@GetMapping("/v1/order/orderbook")
ResponseEntity<OrderBookResponseDto> getOrderBook(
@RequestParam(value = "depth", defaultValue = "10") int depth
);
@GetMapping("/v1/order/market/summary")
ResponseEntity<MarketSummaryDto> getMarketSummary();
}