今天開始會帶著大家一起處理 Data Pipeline 的部分,首先我們要想辦法處理 ithome-crawler 在 Monngo 儲存的爬蟲資料。
下面的範例 Python script,讓我們可以在 command line 操作與 Mongo 資料相關的行為。
# mongo_client.py
def _clean_data(text: str) -> str:
return re.sub('\t|\r|\n|\u3000|,|\xa0', '。', text)
def _clean_df(data: pd.DataFrame) -> pd.DataFrame:
for _col in data.select_dtypes(include=['object']).columns:
data[_col] = data[_col].map(_clean_data)
return data
class IthomeMongoClient:
def __init__(self, collection: str):
mongo_client = pymongo.MongoClient(os.getenv("MONGO_HOST"))
mongo_db = mongo_client[os.getenv("MONGO_DB")]
self.collection = mongo_db[collection]
def get_mongo_data(self, skip_column: set) -> pd.DataFrame:
"""get mongo data to dataframe"""
mongo_df = pd.DataFrame(list(self.collection.find()))
return mongo_df[[_col for _col in mongo_df.columns if not _col in skip_column]]
def dump_to_file(self, skip_column: set, file_path: str) -> None:
"""get mongo data and output to csv
Note: for demo usage, data will replace special characters to space"""
_clean_df(self.get_mongo_data(skip_column=skip_column)).to_csv(file_path, index=None)
def check_data_count(self, contain_header: bool = True) -> int:
"""get mongo data count"""
data_count = 1 if contain_header else 0
return self.collection.count_documents({}) + data_count
def truncate_mongo_data(self) -> None:
"""drop mongo collection"""
self.collection.drop()
@click.group()
@click.option("--collection", "-c", required=True, type=str)
@click.pass_context
def cli(ctx, collection: str):
"""entrypoint"""
ctx.ensure_object(dict)
ctx.obj['mongo_client'] = IthomeMongoClient(collection=collection)
@cli.command()
@click.pass_context
@click.option('--contain-header', is_flag=True)
def count_data(ctx, contain_header: bool = True):
"""count crawl data in mongo, verify usage."""
click.echo(ctx.obj['mongo_client'].check_data_count(contain_header))
@cli.command()
@click.pass_context
@click.option('--skip-column', '-s', multiple=True)
@click.option('--csv-file-path', '-file')
def to_csv(ctx, skip_column: set, csv_file_path: str):
"""get crawl data"""
ctx.obj['mongo_client'].dump_to_file(skip_column=skip_column, file_path=csv_file_path)
@cli.command()
@click.pass_context
def housekeeping(ctx):
"""clean crawl data"""
ctx.obj['mongo_client'].truncate_mongo_data()
當我們想把在 MongoDB 之 content_info 的資料在本地儲存時可以下
python3 mongo_client.py -c content_info to-csv --csv-file-path ./content_info.csv
想要計算 MongoDB 之 content_info 的資料個數
python3 mongo_client.py -c content_info count-data
刪除 MongoDB 之 content_info 的所有資料
python3 mongo_client.py -c content_info housekeeping
這邊我們將 dump mongo data 在 Jenkinsfile 的對應行為宣告下來
stage("Pull mongo data - content_info"){
steps{
sh """
python3 mongo_client.py -c content_info \
to-csv --csv-file-path output/content_info/content_info.csv
"""
}
}
stage("Pull mongo data - user_info"){
steps{
sh """
python3 mongo_client.py -c user_info \
to-csv --csv-file-path output/user_info/user_info.csv
"""
}
}
當 Mongo 資料下載下來成 csv 後,我們需要再次確認資料是否有正確地被完整儲存,因此會再新增一個 stage 來簡單確認資料筆數一致。
stage("Check mongo data - content_info"){
steps{
script{
MONGO_DATA_COUNT = sh (
script: "python3 mongo_client.py -c content_info count-data --contain-header",
returnStdout: true
).trim().toInteger()
CSV_DATA_COUNT = sh (
script: "cat output/content_info/content_info.csv|wc -l",
returnStdout: true
).trim().toInteger()
if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
sh "false"
}
}
}
}
stage("Check mongo data - user_info"){
steps{
script{
MONGO_DATA_COUNT = sh (
script: "python3 mongo_client.py -c user_info count-data --contain-header",
returnStdout: true
).trim().toInteger()
CSV_DATA_COUNT = sh (
script: "cat output/user_info/user_info.csv|wc -l",
returnStdout: true
).trim().toInteger()
if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
sh "false"
}
}
}
}
會覺得上面的寫法有些冗余,所以讓我們用與 parallel
類似的語法 matrix
,來讓整個 Jenkinsfile 更加簡潔。
stage('Data pipeline') {
matrix {
axes {
axis {
name 'DATA'
values 'user_info', 'content_info'
}
}
stages {
stage("Pull mongo data"){
steps{
sh """
python3 mongo_client.py -c ${DATA} \
to-csv --csv-file-path output/${DATA}/${DATA}.csv
"""
}
}
stage("Check mongo data"){
steps{
script{
MONGO_DATA_COUNT = sh (
script: "python3 mongo_client.py -c ${DATA} count-data --contain-header",
returnStdout: true
).trim().toInteger()
CSV_DATA_COUNT = sh (
script: "cat output/${DATA}/${DATA}.csv|wc -l",
returnStdout: true
).trim().toInteger()
if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
sh "false"
}
}
}
}
}
}
}
今天我們已經成功將 Mongo 資料下載到本地,明天會再介紹其他工具來輔佐我們以不同面向來確認資料的正確性。
ithome-data-transfer
ithome-crawler
https://www.jenkins.io/doc/book/pipeline/syntax/#declarative-matrix