iT邦幫忙

2022 iThome 鐵人賽

DAY 28
2
DevOps

從零開始的 Jenkins 之旅系列 第 28

第二十八天 Jenkins 之旅: 我的 BI 報表 Pipeline (8)

  • 分享至 

  • xImage
  •  

Date Parameter in Jenkins

由於此次的 Data Pipeline 有基於時間(execute_date)為條件批次執行的特性,在來源資料不確定的情況下,因此需要考慮有補檔(rerun)的可能性。所以我們必須在 Jenkins pipeline 中將時間作為一個可被注入的參數。

以下有兩種方式可以做到將時間放入 pipeline 中:

  1. Date Parameter Plugin
    Date Parameter Plugin 可以在 Jenkins UI 中新增一個 Date Parameter
    https://ithelp.ithome.com.tw/upload/images/20220927/20151613PtdvODfmyt.png
  • 優勢:設定簡易,直觀
  • 缺點:無法在 Jenkinsfile 中宣告 Date 的格式的 parameter 只能從 UI 設定。
    https://ithelp.ithome.com.tw/upload/images/20220927/20151613I5CVTEhPef.png
  1. as class DateParameterDefinition

也需要安裝 Date Parameter Plugin

有位大大在 StackOverflow 給出了第一種方法的優化

properties([parameters([
  [$class: 'DateParameterDefinition',
   name: 'EXECUTE_DATE',
   dateFormat: 'yyyy-MM-dd',
   defaultValue: 'LocalDate.now()']
])])

pipeline {
...
}

寫入 History Table

在每日爬蟲資料被寫進 GCS 後,在確認資料的正確性後我們即可將單日(execute_date)的批次資料寫入 History Table

*_info_hist

// update_content_info_hist.sql

DELETE FROM `ithome-jenkins-2022.ithome.content_info_hist_test` 
WHERE DATE(crawl_datetime) = @execute_date;

INSERT INTO `ithome-jenkins-2022.ithome.content_info_hist`
SELECT
  `_id`,
  `crawl_datetime`,
  `text`,
  `user_id`,
  `ironman_id`,
  `title`,
  `like`,
  `comment`,
  `view`,
  `article_id`,
  `article_url`,
  `create_datetime`
FROM `ithome-jenkins-2022.ithome.content_info_tmp` 
WHERE DATE(crawl_datetime) = @execute_date
;

https://ithelp.ithome.com.tw/upload/images/20220928/2015161333MsjJOZJF.png

理論上應該要用 drop partition,但是 BigQuery 好像沒有直接在 SQL 下 DROP PARTITION 的寫法,所以先用了一個很爛的寫法,去 full scan table 然後刪除資料 QQ 有機會再好好研究 BigQuery

BigQuery command

cat update_content_info_hist.sql | bq query \
    --nouse_legacy_sql \
    --parameter execute_date:DATE:"${EXECUTE_DATE}"

Jenkinsfile

stage("Append to history table"){
    steps{
        sh '''
            cat sql/update_content_info_hist.sql | bq query \
                --nouse_legacy_sql \
                --parameter execute_date:DATE:"${params.EXECUTE_DATE}"
        '''
    }
}

https://ithelp.ithome.com.tw/upload/images/20220928/20151613xv3sGQq533.png

在 History table 中驗證資料筆數

  • tmp table 的筆數
    https://ithelp.ithome.com.tw/upload/images/20220928/20151613VhdurBMTVX.png
  • history table 的筆數
    https://ithelp.ithome.com.tw/upload/images/20220928/20151613we16RuXtdV.png

此確認邏輯亦可寫成一個 Jenkins 的 stage。(建議)

輸出 GDS Table

*_info_latest

// overwrite_content_info_latest.sql
TRUNCATE TABLE `ithome-jenkins-2022.ithome.content_info_latest`;
INSERT INTO `ithome-jenkins-2022.ithome.content_info_latest`
SELECT
  `_id`,
  `crawl_datetime`,
  `text`,
  `user_id`,
  `ironman_id`,
  `title`,
  `like`,
  `comment`,
  `view`,
  `article_id`,
  `article_url`,
  `create_datetime`
FROM `ithome-jenkins-2022.ithome.content_info_hist`
WHERE DATE(crawl_datetime) = @execute_date
;

ithome.content_info_latest 全表更新為 @execute_date 的資料

BigQuery command

cat overwrite_content_info_latest.sql | bq query \
    --nouse_legacy_sql \
    --parameter execute_date:DATE:"${EXECUTE_DATE}"

Jenkinsfile

stage("Update to latest table"){
    steps{
        sh '''
            cat sql/overwrite_content_info_latest.sql | bq query \
                --nouse_legacy_sql \
                --parameter execute_date:DATE:"${params.EXECUTE_DATE}"
        '''
    }
}

*_info_view_change

// overwrite_content_info_view_change.sql
TRUNCATE TABLE `ithome-jenkins-2022.ithome.content_info_view_change`;
INSERT INTO `ithome-jenkins-2022.ithome.content_info_view_change` (
    `ironman_id`,
    `article_id`,
    `view`,
    `crawl_datetime`,
    `latest_datetime`
)
SELECT
  `ironman_id`,
  `article_id`,
  `view`,
  `crawl_datetime`,
  CASE WHEN
      DATE(crawl_datetime) = @execute_date THEN true ELSE false
  END AS `latest_datetime`
FROM
  `ithome-jenkins-2022.ithome.content_info_hist`
WHERE DATE(crawl_datetime) <= @execute_date
;

BigQuery command

cat overwrite_content_info_view_change.sql | bq query \
    --nouse_legacy_sql \
    --parameter execute_date:DATE:"${EXECUTE_DATE}"

Jenkinsfile

stage("Update to latest table"){
    steps{
        sh '''
            cat sql/overwrite_content_info_view_change.sql | bq query \
                --nouse_legacy_sql \
                --parameter execute_date:DATE:"${params.EXECUTE_DATE}"
        '''
    }
}

最終成果

https://ithelp.ithome.com.tw/upload/images/20220928/20151613lsL2YgM3Ti.png

完整 Jenkinsfile

properties([parameters([
  [$class: 'DateParameterDefinition',
   name: 'EXECUTE_DATE',
   dateFormat: 'yyyy-MM-dd',
   defaultValue: 'LocalDate.now().plusHours(8)']
])])

pipeline {
    agent{
        label "gcp-agent-1"
    }
    environment {
        MONGO_HOST = "mongodb://localhost:27017"
        MONGO_DB = "ithome_ironman"
    } 
    stages {
        stage('Data pipeline(stage 1)') {
            matrix {
                axes {
                    axis {
                        name 'DATA'
                        values 'user_info', 'content_info'
                    }
                }
                stages {
                    stage("Pull mongo data"){
                        steps{
                            sh """
                                python3 mongo_client.py -c ${DATA} \
                                    to-csv --csv-file-path output/${DATA}/${DATA}.csv
                            """
                        }
                    }
                    stage("Check mongo data"){
                        steps{
                            sh  '''
                            MONGO_DATA_COUNT=$(python3 mongo_client.py -c ${DATA} count-data --contain-header)
                            CSV_DATA_COUNT=$(cat output/${DATA}/${DATA}.csv|wc -l)
                            echo "Mongo data count: ${MONGO_DATA_COUNT}"
                            echo "CSV data count: ${CSV_DATA_COUNT}"
                            if [ $CSV_DATA_COUNT != $CSV_DATA_COUNT ]; then exit 1; fi
                            '''
                        }
                    }
                    stage("Check data quality"){
                        steps{
                            sh """
                            docker run -v ${WORKSPACE}/output/${DATA}/:/usr/src/github/ piperider run
                            """
                            sh '''
                            sudo rm -rf ${WORKSPACE}/output/${DATA}/.piperider/outputs/latest
                            sudo ln -s ${WORKSPACE}/output/${DATA}/.piperider/outputs/$(ls ${WORKSPACE}/output/${DATA}/.piperider/outputs | grep ithome|tail -n1) ${WORKSPACE}/output/${DATA}/.piperider/outputs/latest
                            '''
                            sh "python3 get_piperider_result.py --data-source-name ${DATA} "
                        }
                    }
                    stage("Push to GCS"){
                        steps{
                            sh """
                            gcloud alpha storage cp output/${DATA}/${DATA}.csv gs://crawler_result/ithome/ironman2022
                            """
                        }
                    }
                    stage("Append to history table"){
                        steps{
                            sh """
                                cat sql/update_${DATA}_hist.sql | bq query \
                                    --nouse_legacy_sql \
                                    --parameter execute_date:DATE:"${params.EXECUTE_DATE}"
                            """
                        }
                    }
                }
            }
        } 
        stage("Data pipeline(stage 2)") {
            matrix {
                axes {
                    axis {
                        name 'OUTPUT_TABLE'
                        values 'user_info_latest', 'content_info_latest', 'content_info_view_change'
                    }
                }
                stages {
                    stage("Update GDS table"){
                        steps{
                            sh """
                                cat sql/overwrite_${OUTPUT_TABLE}.sql | bq query \
                                    --nouse_legacy_sql \
                                    --parameter execute_date:DATE:"${params.EXECUTE_DATE}"
                            """
                        }
                    }
                }
            }
        }
        stage("House keeping"){
            steps{
                sh "python3 mongo_client.py -c user_info housekeeping"
                sh "python3 mongo_client.py -c content_info housekeeping"
            }
        }
    }
    post{
        always{
            archiveArtifacts artifacts: 'output/**', followSymlinks: false
        }
    }
}

Jenkinsfile 有以下幾點需要說明:

  1. 為了 “方便”,我將 MonogoDB 與 Jenkins Agent (gcp-agent-1) 放在同台機器,所以 DB 的 connection 才會是 mongodb://localhost:27017
  2. Jenkins Agent (gcp-agent-1) 我已經先安裝 Cloud SDK,並且設定好 IAM Role,不然是無法執行像是 bq 或是 gcloud 的指令。
  3. Jenkins Agent (gcp-agent-1) 是有 sudo 權限的 (在某些情況下,這樣設定其實不太好)。
  4. stage - Check data quality 後來是用 Docker 去運行 piperider 的原因是 Jenkins Agent 預設只有 Python3.6 ,可是 PipeRider 最低需要 Python3.7 方能安裝,故最終使用官方的 Docker image 為基礎,去執行資料品質分析,但使用 Docker 同時衍生了一些路徑問題,可以在同一個 stage 的後半段看到,我重新做軟連接。
    至於為什麼不用 docker agent 執行 ...... 其實是因為當初沒想這麼多,等寫完了我才想到 QQ

小結

花了幾天的時間帶著大家一步一步完成 CI/CD Pipeline 與 Data Pipeline,明天會介紹最終的 Google Data Studio 的分析報表。

參考資料

https://towardsdatascience.com/15-essential-steps-to-build-reliable-data-pipelines-58847cb5d92f
https://learn.microsoft.com/zh-tw/power-bi/guidance/star-schema
https://stackoverflow.com/questions/53712774/jenkins-date-parameter-plugin-how-to-use-it-in-a-declarative-pipeline


上一篇
第二十七天 Jenkins 之旅: 我的 BI 報表 Pipeline (7)
下一篇
第二十九天 Jenkins 之旅: 我的 BI 報表 Pipeline (9)
系列文
從零開始的 Jenkins 之旅30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言