最後一天想結合我實習公司的金融回測工具,將回測結果透過語言模型分析成簡單的報表寄 email 到我自己的信箱,自定義 tools 和 agent 的實作,然後串接 GCP 的 Gmail API,將我的鐵人賽劃下一個句點。
要讓 LLM 分析回測結果的圖表需要讓圖片有一個公開的網址,上傳到 Azure、GCP、GitHub 都可以,我這邊選擇 GitHub。
import os
from github import Github
from github.GithubException import GithubException
from dotenv import load_dotenv
load_dotenv()
g = Github(os.getenv('GITHUB_ACCOUNT'), os.getenv('GITHUB_TOKEN'))
def upload_file_to_github(file_path):
username = "Your username"
repo_name = "Your repo"
branch = "main"
repo = g.get_user().get_repo(repo_name)
with open(file_path, 'rb') as image:
i = image.read()
image_data = bytearray(i)
try:
repo.create_file(file_path, "commit image", bytes(image_data), branch=branch)
except GithubException as e:
if e.status == 422: # 422 表示檔案已存在
# 取得現有檔案的 sha 值,進行更新
contents = repo.get_contents(file_path, ref=branch)
repo.update_file(contents.path, "update image", bytes(image_data), contents.sha, branch=branch)
print(f"File {file_path} updated successfully.")
else:
raise e
url = f"https://raw.githubusercontent.com/{username}/{repo_name}/{branch}/{file_path}"
print(f"GitHub URL : {url}")
with open("backtest_img_url.txt", "a") as f:
f.write(url + '\n')
主要用到的回測工具是 TQuant,然後將回測數據的 DataFrame 儲存起來,並且 import 前面寫好的上傳圖片到 GitHub 的函數。那麼 TQuant 的回測程式我就不贅述了~
import os
import pandas as pd
import numpy as np
import subprocess
import matplotlib.pyplot as plt
from dotenv import load_dotenv
load_dotenv()
from langchain_core.tools import tool
from upload_img import upload_file_to_github
# import zipline
from zipline.data import bundles
from zipline.sources.TEJ_Api_Data import get_universe
from zipline.pipeline.filters import StaticAssets
from zipline.finance import slippage, commission
from zipline.api import *
from zipline import run_algorithm
from zipline.pipeline import Pipeline
from zipline.pipeline.factors import IchimokuKinkoHyo, TrueRange, CustomFactor
from zipline.pipeline.data import TWEquityPricing, EquityPricing
from zipline.utils.math_utils import nanmax
from numpy import dstack
import pyfolio as pf
@tool
def Ichimoku_Kinko_Hyo(start, end, idx_id):
"""
Select the stock universe and ingest the data into zipline and run backtesting with the strategy of 一目均衡表
"""
StockList = get_universe(start, end, idx_id=idx_id)
ticker = ','.join(StockList)
with open('backtest_stats.txt', 'w') as f:
f.write(f"Start date: {start}\n")
f.write(f"End date: {end}\n")
f.write(f"StockList: {ticker}\n")
f.write(f"Benchmark: IR0001\n")
f.write(f"Strategy: IchimokuKinkoHyo (一目均衡表)\n")
StockList.append('IR0001')
os.environ['ticker'] = ' '.join(StockList)
os.environ['mdate'] = start+' '+end
start_dt, end_dt = pd.Timestamp(start, tz='utc'), pd.Timestamp(end, tz='utc')
# !zipline ingest -b tquant
command = f"zipline ingest -b tquant"
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(result.stdout.decode())
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e.stderr.decode()}")
bundle = bundles.load('tquant')
benchmark_asset = bundle.asset_finder.lookup_symbol('IR0001', as_of_date = None)
class AverageTrueRange(CustomFactor):
inputs = (
EquityPricing.high,
EquityPricing.low,
EquityPricing.close,
)
window_length = 10
outputs = ["TR", "ATR"]
def compute(self, today, assets, out, highs, lows, closes):
high_to_low = highs[1:] - lows[1:]
high_to_prev_close = abs(highs[1:] - closes[:-1])
low_to_prev_close = abs(lows[1:] - closes[:-1])
tr_current = nanmax(
dstack(
(
high_to_low,
high_to_prev_close,
low_to_prev_close,
)
),
2,
)
sma_atr_values = np.mean(tr_current, axis=0)
out.TR = tr_current[-1]
out.ATR = sma_atr_values
def make_pipeline():
Ich = IchimokuKinkoHyo(
inputs = [TWEquityPricing.high, TWEquityPricing.low, TWEquityPricing.close],
window_length = 52,
)
atr = AverageTrueRange(inputs = [TWEquityPricing.high, TWEquityPricing.low, TWEquityPricing.close], window_length = 52,)
return Pipeline(
columns = {
'curr_price': TWEquityPricing.close.latest,
"tenkan_sen": Ich.tenkan_sen,
"kijun_sen": Ich.kijun_sen,
"senkou_span_a": Ich.senkou_span_a,
"senkou_span_b": Ich.senkou_span_b,
'cloud_red': Ich.senkou_span_a < Ich.senkou_span_b,
"chikou_span": Ich.chikou_span,
'stop_loss': atr.ATR,
},
# screen = ~StaticAssets([benchmark_asset])
screen = ~StaticAssets([benchmark_asset]) & (Ich.senkou_span_a > 0) & (Ich.senkou_span_b > 0)
)
def initialize(context):
set_slippage(slippage.VolumeShareSlippage())
set_commission(commission.Custom_TW_Commission(min_trade_cost = 20, discount = 1.0, tax = 0.003))
attach_pipeline(make_pipeline(), 'mystrats')
set_benchmark(symbol('IR0001'))
context.stop_loss = {}
context.trailing_stop = {}
context.last_buy_price = {}
context.trailing_count = {}
context.holding = {}
context.buy_count = {}
def handle_data(context, data):
out_dir = pipeline_output('mystrats')
for i in out_dir.index:
sym = i.symbol
curr_price = out_dir.loc[i, 'curr_price']
tenkan_sen = out_dir.loc[i, 'tenkan_sen']
kijun_sen = out_dir.loc[i, 'kijun_sen']
senkou_span_a = out_dir.loc[i, 'senkou_span_a']
senkou_span_b = out_dir.loc[i, 'senkou_span_b']
cloud_red = out_dir.loc[i, 'cloud_red']
chikou_span = out_dir.loc[i, 'chikou_span']
stop_loss = out_dir.loc[i, 'stop_loss']
cash_position = context.portfolio.cash # record cash position
stock_position = context.portfolio.positions[i].amount # record stock holding
if context.stop_loss.get(f'{i}') is None:
context.stop_loss[f'{i}'] = 0
if context.trailing_stop.get(f'{i}') is None:
context.trailing_stop[f'{i}'] = False
if context.last_buy_price.get(f'{i}') is None:
context.last_buy_price[f'{i}'] = 0
if context.holding.get(f'{i}') is None:
context.holding[f'{i}'] = False
if context.trailing_count.get(f'{i}') is None:
context.trailing_count[f'{i}'] = 1
if context.buy_count.get(f'{i}') is None:
context.buy_count[f'{i}'] = 0
buy, sell = False, False
record(
**{
f'price_{sym}':curr_price,
f'buy_{sym}':buy,
f'sell_{sym}':sell,
f'tenkan_sen_{sym}': tenkan_sen,
f'kijun_sen_{sym}': kijun_sen,
f'cloud_red_{sym}': cloud_red,
f'senkou_span_a_{sym}': senkou_span_a,
f'senkou_span_b_{sym}': senkou_span_b,
f'chikou_span_{sym}': chikou_span,
}
)
# 三役好轉 (tenkan_sen > kijun_sen*1.015 : avoid the Darvas Box Theory)
if (curr_price > senkou_span_b) and (cloud_red == True) and (tenkan_sen > kijun_sen*1.01) and (context.buy_count[f'{i}'] <= 5):
order_percent(i, 0.01)
buy = True
context.stop_loss[f'{i}'] = curr_price - (1.25 * stop_loss)
context.last_buy_price[f'{i}'] = curr_price
record(
**{
f'buy_{sym}':buy
}
)
context.holding[f'{i}'] = True
context.buy_count[f'{i}'] += 1
# reset stop loss point
if (curr_price >= (1.3**context.trailing_count[f'{i}'])*context.last_buy_price[f'{i}']) and (context.holding[f'{i}'] == True) and (context.trailing_stop[f'{i}'] == False):
context.stop_loss[f'{i}'] = 1.3*context.stop_loss[f'{i}']
context.trailing_stop[f'{i}'] = True
context.trailing_count[f'{i}'] += 1
elif (curr_price >= (1.3**context.trailing_count[f'{i}'])*context.last_buy_price[f'{i}']) and (context.holding[f'{i}'] == True) and (context.trailing_stop[f'{i}'] == True):
context.stop_loss[f'{i}'] = 1.3*context.stop_loss[f'{i}']
context.trailing_count[f'{i}'] += 1
if (curr_price <= context.stop_loss[f'{i}']) and (context.holding[f'{i}'] == True):
order_target(i, 0)
sell = True
context.stop_loss[f'{i}'] = None
context.trailing_stop[f'{i}'] = None
context.trailing_count[f'{i}'] = None
record(
**{
f'sell_{sym}':sell
}
)
context.holding[f'{i}'] = None
context.buy_count[f'{i}'] = None
results = run_algorithm(
start = start_dt,
end = end_dt,
initialize = initialize,
bundle = 'tquant',
capital_base = 1e7,
handle_data = handle_data
)
bt_returns, bt_positions, bt_transactions = pf.utils.extract_rets_pos_txn_from_zipline(results)
benchmark_rets = results.benchmark_return
perf_stats = pf.plotting.show_perf_stats(
bt_returns,
benchmark_rets,
bt_positions,
bt_transactions,
turnover_denom='portfolio_value',
)
# 打開一個txt文件,並將資料寫入
with open('backtest_stats.txt', 'a') as f:
for index, row in perf_stats.iterrows():
# 將每個指標和對應的值寫入文本文件
f.write(f"{index}: {row[0]}\n")
with open("backtest_img_url.txt", "r+") as file: # 'r+' 模式表示可讀寫
file.truncate(0)
# Cumulative Returns
pf.plotting.plot_rolling_returns(bt_returns, benchmark_rets)
plt.savefig('image/cumulative_returns.png')
plt.close()
upload_file_to_github('image/cumulative_returns.png')
# Rolling Volatility
pf.plotting.plot_rolling_volatility(bt_returns, benchmark_rets)
plt.savefig('image/rolling_volatility.png')
plt.close()
upload_file_to_github('image/rolling_volatility.png')
# Rolling Sharpe
pf.plotting.plot_rolling_sharpe(bt_returns, benchmark_rets)
plt.savefig('image/rolling_sharpe.png')
plt.close()
upload_file_to_github('image/rolling_sharpe.png')
# drawdown
pf.plotting.plot_drawdown_underwater(bt_returns)
plt.savefig('image/drawdown.png')
plt.close()
upload_file_to_github('image/drawdown.png')
# monthly returns heatmap
pf.plotting.plot_monthly_returns_heatmap(bt_returns)
plt.savefig('image/monthly_returns_heatmap.png')
plt.close()
upload_file_to_github('image/monthly_returns_heatmap.png')
# annual returns
pf.plotting.plot_annual_returns(bt_returns)
plt.savefig('image/annual_returns.png')
plt.close()
upload_file_to_github('image/annual_returns.png')
# exposures
pf.plotting.plot_exposures(bt_returns, bt_positions)
plt.savefig('image/exposures.png')
plt.close()
upload_file_to_github('image/exposures.png')
# Long & Short Holdings
pf.plotting.plot_long_short_holdings(bt_returns, bt_positions)
plt.savefig('image/long_short_holdings.png')
plt.close()
upload_file_to_github('image/long_short_holdings.png')
# turnover
pf.plotting.plot_turnover(bt_returns, bt_transactions, bt_positions)
plt.savefig('image/turnover.png')
plt.close()
upload_file_to_github('image/turnover.png')
# daily volume
pf.plotting.plot_daily_volume(bt_returns, bt_transactions)
plt.savefig('image/daily_volume.png')
plt.close()
upload_file_to_github('image/daily_volume.png')
with open("backtest_stats.txt", "r") as file:
content = file.read()
return content
請語言模型根據圖片和回測的數據撰寫 email 的內文和標題。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from dotenv import load_dotenv
load_dotenv()
model = ChatOpenAI(model="gpt-4o", temperature=0)
def custom_parser(message) -> str:
html_code = message.content.split("```")[1].replace("html", "", 1)
return html_code
def email_chain(results):
template = """
作為Email文案撰寫者,您的任務是根據提供的回測結果,創建清晰且具規範性的電子郵件。您的角色至關重要,確保我們的溝通內容專業、有結構且視覺上吸引人。
操作指示:
- 初始標題:標題設計為一目均衡表策略的回測結果,電子郵件開頭包括使用者給定的回測條件,包括:回測區間、選擇的股票(全數列出)、比較標的。
- 研究摘要:撰寫詳細的研究結果,包含圖片的詳細描述(圖片標題使用英文即可,不需要翻成中文),相關回測指標的部分需做成表格,以上兩個一定都要!!!
- HTML 格式化:使用適當的 HTML 元素(如標題、段落、項目符號)來增強可讀性,並且有圖片網址的需嵌入圖片在對應的圖片描述中。
- 研究結果:需將研究的結果,也就是所有內容的大結論寫在最後。
- 設計美感:需美化整個 html 設計,像是圖片與描述文字需要至於中央、背景設置為淺藍色、顯而易見的結論,不要只是單純的黑白色系。
以下是回測結果的所有內容:{input}
"""
prompt = PromptTemplate.from_template(template)
chain = prompt | model | RunnableLambda(custom_parser)
with open("backtest_stats.txt", "r") as f:
content = f.read()
final = content + "\n總結:" + results['output']
html_content = chain.invoke({"input": final})
subject_template = "請根據{input}回測結果想一個 Email 的寄件標題,回傳結果僅包含寄件標題即可,標題中需包含策略的名稱。"
print("撰寫 Email 標題")
subject_prompt = PromptTemplate.from_template(subject_template)
subject_chain = subject_prompt | model
subject = subject_chain.invoke({"input": final})
mail_results = {"subject" : subject.content, "html_content" : html_content}
return mail_results
跟 Day28 使用 Gmail API 相似,在建立 OAuth 用戶端 ID 的時候要選擇「電腦版應用程式」,建立完成後有一個 token.json 檔案可以下載,然後將其檔案放在與以下程式同一路徑,程式會將其轉換成可以用的憑證,讓我們可以透過這個憑證用 GCP 的 email 來寄 email。
import os
import json
import base64
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from dotenv import load_dotenv
load_dotenv()
SCOPES = [
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/gmail.send",
]
def get_credentials():
creds = None
token_path = "token.json"
if os.path.exists(token_path):
with open(token_path, "r") as token:
try:
creds_info = json.load(token)
creds = Credentials.from_authorized_user_info(creds_info, SCOPES)
except json.JSONDecodeError:
creds = None
if creds and creds.valid and creds.has_scopes(SCOPES):
return creds
else:
if os.path.exists(token_path):
os.remove(token_path)
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
with open(token_path, "w") as token:
token.write(creds.to_json())
return creds
creds = get_credentials()
service = build("gmail", "v1", credentials=creds)
def send_email(mail_results):
subject = mail_results['subject']
html_content = mail_results['html_content']
message = MIMEMultipart()
message["to"] = os.getenv('SEND_TO')
message["subject"] = subject
html_part = MIMEText(html_content, 'html')
message.attach(html_part)
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
message_body = {"raw": raw}
service.users().messages().send(userId="me", body=message_body).execute()
print(f"Email sent success!")
執行程式後就可以到 Email 中查看結果!
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda
from backtest_tools import Ichimoku_Kinko_Hyo
from read_img import read_img
from email_content import email_chain
from send_mail import send_email
from dotenv import load_dotenv
load_dotenv()
chat_model = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [Ichimoku_Kinko_Hyo]
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that you can run the backtesting."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
]
)
agent = create_openai_functions_agent(
llm=chat_model,
prompt=prompt,
tools=tools,
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True
)
agent_chain = agent_executor | RunnableLambda(read_img) | RunnableLambda(email_chain) | RunnableLambda(send_email)
response = agent_chain.invoke({"input": "先幫我進行選股,回測區間為 2019-04-01 至 2024-04-01,選股的id為 'IX0002',最後幫我將回測結果的指標做一個詳細結論。"})
可以看到以下的結果皆是由 AI 分析的結果,除了數據之外圖片也有呈現出來,雖然說可能專業度略顯不足,但將策略擴充,可以更自由的達到金融回測自動化。
一開始參加鐵人賽其實是因為跟朋友無聊聊到,才知道原來在我初學程式的階段一直拯救我的 IT 邦幫忙有這個特別的鐵人賽。原本只是想記錄我自己實習期間學了什麼,但在這個過程為了求證一個技術的核心架構或者是為了讓整篇文章更完整,我花了比我預想的更多時間來寫每一篇文章,加上我都是每天下班才開始思考今天文章的內容,無形之中我覺得帶給我相當大的進步。舉例來說我原本真的是不太會用 LangChain,但是透過這個競賽讓我現在幾乎摸熟了 LangChain。
如果今天有人問我要不要參加這個鐵人賽,其實我真的是大力推薦,就算覺得自己能力有限,還是要為自己嘗試看看。只要有撐下去的話,真的可以靠自學學到很多東西,也可以慢慢抓到如何在短時間內摸熟一個自己不熟的技術或工具。
最後來分享一下為什麼我不喜歡唸書好了哈哈哈!其實我原本是五專商科的學生,在遇到補習班老師之前,我是一個根本就愛去不去學校的人。結果在偶然的情況下被朋友拉去旁聽一次課程,就覺得這個補習班很有料 (台北的北一陳偉真的是商科第一,今年 2024 台政好像只有一個名額不是北陳的學生),特別是經濟學的徐喬老師和微積分的吳限老師,讓我會開始期待每次的補習班上課。然後就這樣我轉學考唸到在補習班保底前五名,第一次在人生中感受到唸書很有趣,結果疫情就來了,筆試也沒了,台政成的希望也全部破滅,最後好在東吳資科備取收留了我,才讓我接觸到寫程式,讓我重新對學習這件事有動力繼續下去。但經歷過準備轉學考漫長的近兩年時間,考研究所在經歷一次的話我是完全不想,所以如果有跟我一樣不喜歡唸書的人,就去找實習吧~
謝謝有看我文章的大家,不管是給我指教的還是默默有看完的都很感謝,從來沒想過我會因為寫程式而對於寫文章這件事這麼的樂在其中。我每篇文章都是很認真的在寫,即使是日更也沒有一篇是隨便帶過,我連陪女朋友的時間都拿來寫文章。所以如果有要參加鐵人賽的人建議不要跟我一樣每天日更,先準備寫好的幾篇,不然真的會很累🤣