iT邦幫忙

2022 iThome 鐵人賽

DAY 25
1
DevOps

從零開始的 Jenkins 之旅系列 第 25

第二十五天 Jenkins 之旅: 我的 BI 報表 Pipeline (5)

  • 分享至 

  • xImage
  •  

今天開始會帶著大家一起處理 Data Pipeline 的部分,首先我們要想辦法處理 ithome-crawler 在 Monngo 儲存的爬蟲資料。
https://ithelp.ithome.com.tw/upload/images/20220925/201516130tn2eVOyyO.png

下面的範例 Python script,讓我們可以在 command line 操作與 Mongo 資料相關的行為。

# mongo_client.py

def _clean_data(text: str) -> str:
    return re.sub('\t|\r|\n|\u3000|,|\xa0', '。', text)

def _clean_df(data: pd.DataFrame) -> pd.DataFrame:
    for _col in data.select_dtypes(include=['object']).columns:
        data[_col] = data[_col].map(_clean_data)
    return data

class IthomeMongoClient:
    def __init__(self, collection: str):
        mongo_client = pymongo.MongoClient(os.getenv("MONGO_HOST"))
        mongo_db = mongo_client[os.getenv("MONGO_DB")]
        self.collection = mongo_db[collection]

    def get_mongo_data(self, skip_column: set) -> pd.DataFrame:
        """get mongo data to dataframe"""
        mongo_df = pd.DataFrame(list(self.collection.find()))
        return mongo_df[[_col for _col in mongo_df.columns if not _col in skip_column]]

    def dump_to_file(self, skip_column: set, file_path: str) -> None:
        """get mongo data and output to csv
        Note: for demo usage, data will replace special characters to space"""
        _clean_df(self.get_mongo_data(skip_column=skip_column)).to_csv(file_path, index=None)

    def check_data_count(self, contain_header: bool = True) -> int:
        """get mongo data count"""
        data_count = 1 if contain_header else 0
        return self.collection.count_documents({}) + data_count

    def truncate_mongo_data(self) -> None:
        """drop mongo collection"""
        self.collection.drop()

@click.group()
@click.option("--collection", "-c", required=True, type=str)
@click.pass_context
def cli(ctx, collection: str):
    """entrypoint"""
    ctx.ensure_object(dict)
    ctx.obj['mongo_client'] = IthomeMongoClient(collection=collection)

@cli.command()
@click.pass_context
@click.option('--contain-header', is_flag=True)
def count_data(ctx, contain_header: bool = True):
    """count crawl data in mongo, verify usage."""
    click.echo(ctx.obj['mongo_client'].check_data_count(contain_header))

@cli.command()
@click.pass_context
@click.option('--skip-column', '-s',  multiple=True)
@click.option('--csv-file-path', '-file')
def to_csv(ctx, skip_column: set, csv_file_path: str):
    """get crawl data"""
    ctx.obj['mongo_client'].dump_to_file(skip_column=skip_column, file_path=csv_file_path)

@cli.command()
@click.pass_context
def housekeeping(ctx):
    """clean crawl data"""
    ctx.obj['mongo_client'].truncate_mongo_data()

當我們想把在 MongoDB 之 content_info 的資料在本地儲存時可以下

python3 mongo_client.py -c content_info to-csv --csv-file-path ./content_info.csv

想要計算 MongoDB 之 content_info 的資料個數

python3 mongo_client.py -c content_info count-data

刪除 MongoDB 之 content_info 的所有資料

python3 mongo_client.py -c content_info housekeeping

這邊我們將 dump mongo data 在 Jenkinsfile 的對應行為宣告下來

stage("Pull mongo data - content_info"){
    steps{
        sh """
            python3 mongo_client.py -c content_info \
                to-csv --csv-file-path output/content_info/content_info.csv
           """
    }
}
stage("Pull mongo data - user_info"){
    steps{
        sh """
            python3 mongo_client.py -c user_info \
                to-csv --csv-file-path output/user_info/user_info.csv
           """
    }
}

當 Mongo 資料下載下來成 csv 後,我們需要再次確認資料是否有正確地被完整儲存,因此會再新增一個 stage 來簡單確認資料筆數一致。

stage("Check mongo data - content_info"){
    steps{
        script{
             MONGO_DATA_COUNT = sh (
                    script: "python3 mongo_client.py -c content_info count-data --contain-header",
                    returnStdout: true
                ).trim().toInteger()
             CSV_DATA_COUNT = sh (
                    script: "cat output/content_info/content_info.csv|wc -l",
                    returnStdout: true
                ).trim().toInteger()
             if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
                 sh "false"
             }
        }
    }
}
stage("Check mongo data - user_info"){
    steps{
        script{
             MONGO_DATA_COUNT = sh (
                    script: "python3 mongo_client.py -c user_info count-data --contain-header",
                    returnStdout: true
                ).trim().toInteger()
             CSV_DATA_COUNT = sh (
                    script: "cat output/user_info/user_info.csv|wc -l",
                    returnStdout: true
                ).trim().toInteger()
             if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
                 sh "false"
             }
        }
    }
}

會覺得上面的寫法有些冗余,所以讓我們用與 parallel 類似的語法 matrix,來讓整個 Jenkinsfile 更加簡潔。

stage('Data pipeline') {
    matrix {
        axes {
            axis {
                name 'DATA'
                values 'user_info', 'content_info'
            }
        }
        stages {
            stage("Pull mongo data"){
                steps{
                    sh """
                        python3 mongo_client.py -c ${DATA} \
                            to-csv --csv-file-path output/${DATA}/${DATA}.csv
                       """
                }
            }
            stage("Check mongo data"){
                steps{
                   script{
                         MONGO_DATA_COUNT = sh (
                                script: "python3 mongo_client.py -c ${DATA} count-data --contain-header",
                                returnStdout: true
                            ).trim().toInteger()
                         CSV_DATA_COUNT = sh (
                                script: "cat output/${DATA}/${DATA}.csv|wc -l",
                                returnStdout: true
                            ).trim().toInteger()
                         if (MONGO_DATA_COUNT != CSV_DATA_COUNT){
                             sh "false"
                         }
                    }
                }
            }
        }
    }
}

小結

今天我們已經成功將 Mongo 資料下載到本地,明天會再介紹其他工具來輔佐我們以不同面向來確認資料的正確性。

專案連結

ithome-data-transfer
ithome-crawler

參考資料

https://www.jenkins.io/doc/book/pipeline/syntax/#declarative-matrix


上一篇
第二十四天 Jenkins 之旅: 我的 BI 報表 Pipeline (4)
下一篇
第二十六天 Jenkins 之旅: 我的 BI 報表 Pipeline (6)
系列文
從零開始的 Jenkins 之旅30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言