今天我們要深入學習期現套利策略的實際應用,這就像爸爸發現同樣的農產品在不同市場有價格差異時的套利行為。早上在產地收購便宜的蔬菜,下午到城裡的高檔超市賣出,賺取價差!期現套利也是類似的概念,利用現貨和期貨間的價格差異獲利。
期貨理論價格公式:
F = S × e^(r-q)T
其中:
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
class BasisMonitor:
"""基差監控系統"""
def __init__(self, symbols=['BTCUSDT'], entry_threshold=0.005, exit_threshold=0.002):
self.symbols = symbols
self.entry_threshold = entry_threshold # 0.5% 進場閾值
self.exit_threshold = exit_threshold # 0.2% 出場閾值
self.basis_history = {symbol: [] for symbol in symbols}
self.current_positions = {}
async def calculate_basis(self, spot_price, futures_price, funding_rate=0):
"""計算基差和年化收益率"""
# 基差(絕對值)
basis = futures_price - spot_price
# 基差率(相對值)
basis_rate = basis / spot_price
# 考慮資金費率的年化收益
# 假設8小時收取一次資金費率
annual_funding_cost = funding_rate * 3 * 365 # 每日3次 × 365天
# 調整後的套利收益
adjusted_basis_rate = basis_rate - annual_funding_cost
return {
'basis': basis,
'basis_rate': basis_rate,
'adjusted_basis_rate': adjusted_basis_rate,
'funding_rate': funding_rate,
'annual_funding_cost': annual_funding_cost
}
async def identify_arbitrage_opportunity(self, symbol, spot_data, futures_data):
"""識別套利機會"""
spot_price = spot_data['price']
futures_price = futures_data['price']
funding_rate = futures_data.get('funding_rate', 0)
basis_info = await self.calculate_basis(spot_price, futures_price, funding_rate)
# 記錄歷史基差
basis_record = {
'timestamp': datetime.now(),
'spot_price': spot_price,
'futures_price': futures_price,
'basis': basis_info['basis'],
'basis_rate': basis_info['basis_rate'],
'adjusted_basis_rate': basis_info['adjusted_basis_rate'],
'funding_rate': funding_rate
}
self.basis_history[symbol].append(basis_record)
# 保持最近1000筆記錄
if len(self.basis_history[symbol]) > 1000:
self.basis_history[symbol] = self.basis_history[symbol][-1000:]
# 判斷套利機會
opportunity = None
adjusted_rate = basis_info['adjusted_basis_rate']
if adjusted_rate > self.entry_threshold:
# 正向套利:期貨溢價過高
opportunity = {
'type': 'positive_arbitrage',
'action': 'sell_futures_buy_spot',
'expected_return': adjusted_rate,
'spot_action': 'buy',
'futures_action': 'sell',
'confidence': min(adjusted_rate / self.entry_threshold, 3.0)
}
elif adjusted_rate < -self.entry_threshold:
# 反向套利:期貨貼水過深
opportunity = {
'type': 'negative_arbitrage',
'action': 'buy_futures_sell_spot',
'expected_return': abs(adjusted_rate),
'spot_action': 'sell',
'futures_action': 'buy',
'confidence': min(abs(adjusted_rate) / self.entry_threshold, 3.0)
}
elif symbol in self.current_positions and abs(adjusted_rate) < self.exit_threshold:
# 平倉機會:基差收斂
opportunity = {
'type': 'close_arbitrage',
'action': 'close_position',
'expected_return': self.current_positions[symbol]['unrealized_pnl'],
'confidence': 1.0
}
return opportunity, basis_info
def get_basis_statistics(self, symbol, lookback_days=30):
"""獲取基差統計資訊"""
if symbol not in self.basis_history or not self.basis_history[symbol]:
return None
# 獲取最近N天的數據
cutoff_time = datetime.now() - timedelta(days=lookback_days)
recent_data = [
record for record in self.basis_history[symbol]
if record['timestamp'] > cutoff_time
]
if not recent_data:
return None
basis_rates = [record['basis_rate'] for record in recent_data]
adjusted_rates = [record['adjusted_basis_rate'] for record in recent_data]
stats = {
'mean_basis_rate': np.mean(basis_rates),
'std_basis_rate': np.std(basis_rates),
'mean_adjusted_rate': np.mean(adjusted_rates),
'std_adjusted_rate': np.std(adjusted_rates),
'min_basis_rate': np.min(basis_rates),
'max_basis_rate': np.max(basis_rates),
'percentile_25': np.percentile(basis_rates, 25),
'percentile_75': np.percentile(basis_rates, 75),
'samples_count': len(recent_data)
}
return stats
class ArbitrageStrategy:
"""期現套利策略"""
def __init__(self, initial_capital=100000):
self.initial_capital = initial_capital
self.available_capital = initial_capital
self.positions = {}
self.trade_history = []
self.basis_monitor = BasisMonitor()
async def execute_arbitrage(self, symbol, opportunity, spot_price, futures_price):
"""執行套利策略"""
if opportunity['type'] in ['positive_arbitrage', 'negative_arbitrage']:
return await self._open_arbitrage_position(
symbol, opportunity, spot_price, futures_price
)
elif opportunity['type'] == 'close_arbitrage':
return await self._close_arbitrage_position(symbol, spot_price, futures_price)
async def _open_arbitrage_position(self, symbol, opportunity, spot_price, futures_price):
"""開啟套利倉位"""
# 計算倉位大小(使用20%的可用資金)
position_capital = self.available_capital * 0.2
# 基於期望收益調整倉位大小
confidence_multiplier = min(opportunity['confidence'], 2.0)
adjusted_capital = position_capital * confidence_multiplier
spot_quantity = adjusted_capital / spot_price
futures_quantity = spot_quantity # 等量對沖
# 檢查資金是否足夠
required_capital = spot_quantity * spot_price
if required_capital > self.available_capital:
return False, "資金不足"
# 模擬交易執行
spot_commission = spot_quantity * spot_price * 0.001 # 0.1% 手續費
futures_margin = futures_quantity * futures_price * 0.1 # 10倍槓桿,10%保證金
futures_commission = futures_quantity * futures_price * 0.0004 # 0.04% 手續費
total_cost = required_capital + spot_commission + futures_margin + futures_commission
if total_cost > self.available_capital:
return False, "總成本超過可用資金"
# 更新倉位
position = {
'symbol': symbol,
'type': opportunity['type'],
'entry_time': datetime.now(),
'spot_quantity': spot_quantity,
'futures_quantity': futures_quantity,
'spot_entry_price': spot_price,
'futures_entry_price': futures_price,
'spot_action': opportunity['spot_action'],
'futures_action': opportunity['futures_action'],
'expected_return': opportunity['expected_return'],
'total_cost': total_cost,
'spot_commission': spot_commission,
'futures_commission': futures_commission,
'futures_margin': futures_margin
}
self.positions[symbol] = position
self.available_capital -= total_cost
# 記錄交易
trade_record = {
'timestamp': datetime.now(),
'action': 'open_arbitrage',
'symbol': symbol,
'type': opportunity['type'],
'spot_price': spot_price,
'futures_price': futures_price,
'quantity': spot_quantity,
'expected_return': opportunity['expected_return'],
'total_cost': total_cost
}
self.trade_history.append(trade_record)
return True, f"成功開啟 {symbol} 套利倉位"
async def _close_arbitrage_position(self, symbol, spot_price, futures_price):
"""平倉套利倉位"""
if symbol not in self.positions:
return False, "無對應套利倉位"
position = self.positions[symbol]
# 計算損益
spot_pnl = 0
futures_pnl = 0
if position['spot_action'] == 'buy':
# 現貨做多,現在賣出
spot_pnl = position['spot_quantity'] * (spot_price - position['spot_entry_price'])
else:
# 現貨做空,現在買入
spot_pnl = position['spot_quantity'] * (position['spot_entry_price'] - spot_price)
if position['futures_action'] == 'sell':
# 期貨做空,現在買入平倉
futures_pnl = position['futures_quantity'] * (position['futures_entry_price'] - futures_price)
else:
# 期貨做多,現在賣出平倉
futures_pnl = position['futures_quantity'] * (futures_price - position['futures_entry_price'])
# 計算平倉手續費
spot_close_commission = position['spot_quantity'] * spot_price * 0.001
futures_close_commission = position['futures_quantity'] * futures_price * 0.0004
total_pnl = spot_pnl + futures_pnl - spot_close_commission - futures_close_commission
# 釋放資金
released_capital = position['total_cost'] + total_pnl
self.available_capital += released_capital
# 記錄平倉交易
trade_record = {
'timestamp': datetime.now(),
'action': 'close_arbitrage',
'symbol': symbol,
'type': position['type'],
'spot_price': spot_price,
'futures_price': futures_price,
'quantity': position['spot_quantity'],
'spot_pnl': spot_pnl,
'futures_pnl': futures_pnl,
'total_pnl': total_pnl,
'return_rate': total_pnl / position['total_cost'],
'holding_period': (datetime.now() - position['entry_time']).total_seconds() / 3600 # 小時
}
self.trade_history.append(trade_record)
# 移除倉位
del self.positions[symbol]
return True, f"成功平倉 {symbol} 套利倉位,損益: ${total_pnl:.2f}"
def calculate_unrealized_pnl(self, symbol, current_spot_price, current_futures_price):
"""計算未實現損益"""
if symbol not in self.positions:
return 0
position = self.positions[symbol]
# 計算當前損益
spot_pnl = 0
futures_pnl = 0
if position['spot_action'] == 'buy':
spot_pnl = position['spot_quantity'] * (current_spot_price - position['spot_entry_price'])
else:
spot_pnl = position['spot_quantity'] * (position['spot_entry_price'] - current_spot_price)
if position['futures_action'] == 'sell':
futures_pnl = position['futures_quantity'] * (position['futures_entry_price'] - current_futures_price)
else:
futures_pnl = position['futures_quantity'] * (current_futures_price - position['futures_entry_price'])
total_unrealized_pnl = spot_pnl + futures_pnl
return total_unrealized_pnl
def get_performance_summary(self):
"""獲取績效總結"""
if not self.trade_history:
return {}
closed_trades = [trade for trade in self.trade_history if trade['action'] == 'close_arbitrage']
if not closed_trades:
return {'message': '尚無完成的套利交易'}
total_pnl = sum(trade['total_pnl'] for trade in closed_trades)
total_return = total_pnl / self.initial_capital
winning_trades = [trade for trade in closed_trades if trade['total_pnl'] > 0]
win_rate = len(winning_trades) / len(closed_trades)
avg_return = np.mean([trade['return_rate'] for trade in closed_trades])
avg_holding_period = np.mean([trade['holding_period'] for trade in closed_trades])
return {
'total_trades': len(closed_trades),
'winning_trades': len(winning_trades),
'win_rate': win_rate,
'total_pnl': total_pnl,
'total_return': total_return,
'avg_return_per_trade': avg_return,
'avg_holding_period_hours': avg_holding_period,
'available_capital': self.available_capital,
'active_positions': len(self.positions)
}
# 套利策略範例執行
async def run_arbitrage_example():
"""運行套利策略範例"""
strategy = ArbitrageStrategy(initial_capital=100000)
# 模擬市場數據
market_scenarios = [
# 場景1: 期貨溢價過高,正向套利機會
{
'spot_price': 50000,
'futures_price': 50300, # 0.6% 溢價
'funding_rate': 0.0001
},
# 場景2: 價差收斂,平倉機會
{
'spot_price': 50100,
'futures_price': 50150, # 0.1% 溢價
'funding_rate': 0.0001
},
# 場景3: 期貨貼水,反向套利機會
{
'spot_price': 49000,
'futures_price': 48700, # -0.61% 貼水
'funding_rate': -0.0002
}
]
print("期現套利策略模擬:")
print("=" * 50)
for i, scenario in enumerate(market_scenarios, 1):
print(f"\n場景 {i}:")
print(f"現貨價格: ${scenario['spot_price']:,}")
print(f"期貨價格: ${scenario['futures_price']:,}")
print(f"資金費率: {scenario['funding_rate']:.4%}")
# 識別套利機會
opportunity, basis_info = await strategy.basis_monitor.identify_arbitrage_opportunity(
'BTCUSDT',
{'price': scenario['spot_price']},
{'price': scenario['futures_price'], 'funding_rate': scenario['funding_rate']}
)
print(f"基差率: {basis_info['basis_rate']:.4%}")
print(f"調整後基差率: {basis_info['adjusted_basis_rate']:.4%}")
if opportunity:
print(f"套利機會: {opportunity['type']}")
print(f"預期收益率: {opportunity['expected_return']:.4%}")
# 執行套利
success, message = await strategy.execute_arbitrage(
'BTCUSDT', opportunity, scenario['spot_price'], scenario['futures_price']
)
print(f"執行結果: {message}")
# 顯示當前倉位狀態
if 'BTCUSDT' in strategy.positions:
position = strategy.positions['BTCUSDT']
unrealized_pnl = strategy.calculate_unrealized_pnl(
'BTCUSDT', scenario['spot_price'], scenario['futures_price']
)
print(f"未實現損益: ${unrealized_pnl:.2f}")
else:
print("無套利機會")
print(f"可用資金: ${strategy.available_capital:,.2f}")
# 顯示最終績效
print("\n" + "=" * 50)
print("績效總結:")
performance = strategy.get_performance_summary()
if 'message' in performance:
print(performance['message'])
else:
print(f"總交易次數: {performance['total_trades']}")
print(f"勝率: {performance['win_rate']:.2%}")
print(f"總損益: ${performance['total_pnl']:.2f}")
print(f"總收益率: {performance['total_return']:.2%}")
print(f"平均持倉時間: {performance['avg_holding_period_hours']:.1f} 小時")
# 執行範例
await run_arbitrage_example()
class DynamicHedgeRatioArbitrage:
"""動態對沖比例套利"""
def __init__(self):
self.hedge_ratios = {}
def calculate_optimal_hedge_ratio(self, spot_returns, futures_returns, lookback_period=30):
"""計算最優對沖比例"""
# 使用最小方差對沖比例
# h* = Cov(ΔS, ΔF) / Var(ΔF)
if len(spot_returns) < lookback_period or len(futures_returns) < lookback_period:
return 1.0 # 預設1:1對沖
recent_spot = spot_returns[-lookback_period:]
recent_futures = futures_returns[-lookback_period:]
covariance = np.cov(recent_spot, recent_futures)[0, 1]
futures_variance = np.var(recent_futures)
if futures_variance == 0:
return 1.0
optimal_ratio = covariance / futures_variance
# 限制對沖比例在合理範圍內
optimal_ratio = max(min(optimal_ratio, 1.5), 0.5)
return optimal_ratio
def adjust_position_sizes(self, spot_quantity, optimal_hedge_ratio):
"""調整倉位大小"""
futures_quantity = spot_quantity * optimal_hedge_ratio
return {
'spot_quantity': spot_quantity,
'futures_quantity': futures_quantity,
'hedge_ratio': optimal_hedge_ratio,
'hedge_effectiveness': min(optimal_hedge_ratio, 1.0)
}
class MultiLegArbitrage:
"""多腿套利策略"""
def __init__(self):
self.active_strategies = {}
async def calendar_spread_arbitrage(self, near_month_price, far_month_price,
near_expiry_days, far_expiry_days):
"""日曆價差套利"""
# 計算年化收益率差異
near_annual_rate = np.log(near_month_price / far_month_price) * (365 / (far_expiry_days - near_expiry_days))
# 如果遠月合約相對便宜,做多遠月做空近月
if near_annual_rate > 0.1: # 10% 年化差異
return {
'strategy': 'calendar_spread',
'action': 'long_far_short_near',
'near_action': 'sell',
'far_action': 'buy',
'expected_convergence': near_annual_rate
}
elif near_annual_rate < -0.1:
return {
'strategy': 'calendar_spread',
'action': 'long_near_short_far',
'near_action': 'buy',
'far_action': 'sell',
'expected_convergence': abs(near_annual_rate)
}
return None
async def cross_exchange_arbitrage(self, exchange_prices):
"""跨交易所套利"""
opportunities = []
exchanges = list(exchange_prices.keys())
for i in range(len(exchanges)):
for j in range(i + 1, len(exchanges)):
exchange_a = exchanges[i]
exchange_b = exchanges[j]
price_a = exchange_prices[exchange_a]['price']
price_b = exchange_prices[exchange_b]['price']
# 考慮交易成本
cost_a = exchange_prices[exchange_a].get('trading_cost', 0.002)
cost_b = exchange_prices[exchange_b].get('trading_cost', 0.002)
# 計算淨價差
gross_spread = abs(price_a - price_b) / min(price_a, price_b)
net_spread = gross_spread - cost_a - cost_b
if net_spread > 0.003: # 0.3% 最小利潤要求
if price_a > price_b:
opportunities.append({
'type': 'cross_exchange_arbitrage',
'buy_exchange': exchange_b,
'sell_exchange': exchange_a,
'buy_price': price_b,
'sell_price': price_a,
'gross_spread': gross_spread,
'net_spread': net_spread,
'profit_potential': net_spread
})
else:
opportunities.append({
'type': 'cross_exchange_arbitrage',
'buy_exchange': exchange_a,
'sell_exchange': exchange_b,
'buy_price': price_a,
'sell_price': price_b,
'gross_spread': gross_spread,
'net_spread': net_spread,
'profit_potential': net_spread
})
return opportunities
class ArbitrageRiskManager:
"""套利風險管理器"""
def __init__(self, max_position_count=5, max_capital_allocation=0.5):
self.max_position_count = max_position_count
self.max_capital_allocation = max_capital_allocation
self.risk_limits = {
'max_basis_deviation': 0.02, # 2% 最大基差偏離
'max_holding_period': 24, # 24 小時最大持倉時間
'max_individual_loss': 0.05, # 5% 單筆最大損失
'max_daily_loss': 0.10 # 10% 日最大損失
}
def validate_arbitrage_opportunity(self, opportunity, current_positions,
portfolio_value, daily_pnl):
"""驗證套利機會的風險"""
validation_results = {
'approved': True,
'warnings': [],
'rejections': []
}
# 檢查倉位數量限制
if len(current_positions) >= self.max_position_count:
validation_results['approved'] = False
validation_results['rejections'].append('超過最大倉位數量限制')
# 檢查資金分配限制
total_allocated = sum(pos.get('total_cost', 0) for pos in current_positions.values())
allocation_ratio = total_allocated / portfolio_value
if allocation_ratio > self.max_capital_allocation:
validation_results['approved'] = False
validation_results['rejections'].append('超過最大資金分配比例')
# 檢查基差偏離度
if abs(opportunity.get('expected_return', 0)) > self.risk_limits['max_basis_deviation']:
validation_results['warnings'].append('基差偏離度過大,需要謹慎')
# 檢查日損失限制
if daily_pnl < -self.risk_limits['max_daily_loss'] * portfolio_value:
validation_results['approved'] = False
validation_results['rejections'].append('超過日損失限制')
return validation_results
def calculate_position_size(self, opportunity, available_capital, volatility):
"""基於風險計算倉位大小"""
# 基礎倉位大小
base_allocation = 0.2 # 20%
# 根據預期收益調整
return_adjustment = min(opportunity.get('expected_return', 0) * 10, 1.0)
# 根據波動性調整
volatility_adjustment = max(0.5, 1 - volatility * 5)
# 最終倉位大小
final_allocation = base_allocation * return_adjustment * volatility_adjustment
position_size = available_capital * final_allocation
return {
'position_size': position_size,
'allocation_ratio': final_allocation,
'return_adjustment': return_adjustment,
'volatility_adjustment': volatility_adjustment
}
def monitor_position_risks(self, positions, current_prices):
"""監控倉位風險"""
risk_alerts = []
for symbol, position in positions.items():
# 檢查持倉時間
holding_hours = (datetime.now() - position['entry_time']).total_seconds() / 3600
if holding_hours > self.risk_limits['max_holding_period']:
risk_alerts.append({
'type': 'holding_period_exceeded',
'symbol': symbol,
'holding_hours': holding_hours,
'severity': 'medium'
})
# 檢查單筆損失
if symbol in current_prices:
spot_price = current_prices[symbol]['spot']
futures_price = current_prices[symbol]['futures']
unrealized_pnl = self._calculate_unrealized_pnl(position, spot_price, futures_price)
loss_ratio = abs(unrealized_pnl) / position['total_cost']
if unrealized_pnl < 0 and loss_ratio > self.risk_limits['max_individual_loss']:
risk_alerts.append({
'type': 'individual_loss_exceeded',
'symbol': symbol,
'loss_ratio': loss_ratio,
'unrealized_pnl': unrealized_pnl,
'severity': 'high'
})
return risk_alerts
def _calculate_unrealized_pnl(self, position, current_spot, current_futures):
"""計算未實現損益"""
# 實作與之前相同的邏輯
pass
class ArbitrageExecutionOptimizer:
"""套利執行優化器"""
def __init__(self):
self.execution_algorithms = ['market', 'limit', 'iceberg', 'twap']
def optimize_execution_strategy(self, order_size, market_depth, volatility):
"""優化執行策略"""
strategy = {
'algorithm': 'market',
'split_orders': False,
'execution_timeframe': 'immediate'
}
# 根據訂單大小調整
if order_size > market_depth * 0.1: # 訂單超過市場深度10%
strategy['algorithm'] = 'iceberg'
strategy['split_orders'] = True
strategy['chunk_size'] = market_depth * 0.05
strategy['execution_timeframe'] = 'extended'
# 根據波動性調整
if volatility > 0.02: # 高波動性
strategy['algorithm'] = 'limit'
strategy['price_improvement_attempts'] = 3
strategy['timeout'] = 30 # 30秒超時
return strategy
async def execute_arbitrage_orders(self, spot_order, futures_order, execution_strategy):
"""執行套利訂單"""
if execution_strategy['algorithm'] == 'market':
return await self._execute_market_orders(spot_order, futures_order)
elif execution_strategy['algorithm'] == 'limit':
return await self._execute_limit_orders(spot_order, futures_order, execution_strategy)
elif execution_strategy['algorithm'] == 'iceberg':
return await self._execute_iceberg_orders(spot_order, futures_order, execution_strategy)
async def _execute_market_orders(self, spot_order, futures_order):
"""執行市價單"""
# 同時執行現貨和期貨訂單以最小化基差風險
results = await asyncio.gather(
self._submit_spot_order(spot_order),
self._submit_futures_order(futures_order),
return_exceptions=True
)
return self._process_execution_results(results)
async def _execute_limit_orders(self, spot_order, futures_order, strategy):
"""執行限價單"""
# 根據當前買賣價差設定限價
spot_limit_price = spot_order['price'] * (1 + 0.0005) # 小幅改善價格
futures_limit_price = futures_order['price'] * (1 - 0.0005)
# 提交限價單
spot_result = await self._submit_limit_order(spot_order, spot_limit_price)
futures_result = await self._submit_limit_order(futures_order, futures_limit_price)
return self._process_limit_results(spot_result, futures_result, strategy)
今天我們深入學習了期現套利策略的實際應用,就像掌握了在不同市場間買賣農產品的精髓。重要概念包括:
套利策略核心:
實戰要點:
成功關鍵:
常見挑戰:
記住爸爸做生意的原則:「穩賺不賠的生意不多,但只要細心觀察,總能找到機會」。期現套利雖然看似穩健,但也需要專業的技術和嚴格的風險管理!
明天我們將學習回測系統的建立,為策略驗證打下堅實的基礎。
下一篇:Day 24 - Backtesting