iT邦幫忙

2024 iThome 鐵人賽

DAY 19
0
AI/ ML & Data

從點子構想到部署上線:機器學習專案的一生系列 第 19

[Day 19] Ray - Netflix、Spotify 和 Uber 都在用的開源分散式計算框架,加速你的計算 - Part 1. 處理資料

  • 分享至 

  • xImage
  •  

我們聊了快 20 天的理論,今天來寫個 code 吧!
在讀 Netflix、Spotify、Uber 和各大科技公司的文獻時,發現他們都會不約而同地提到一個 Python 套件——Ray,用來幫助他們處理資料或是訓練模型。

這究竟是什麼神奇套件,讓大家都在用呢?今天讓我們一起來認識他吧!


TL;DR
Ray 用超級神奇簡單的方式,讓我們可以輕鬆使用分散式計算
最近剛好需要處理上百萬筆的數據,原本可能需要十幾個小時才能夠前處理完
我在完全沒有動到原本 data processing 的 code 之下,只是加了幾行,改成用 ray 呼叫
結果吃個午餐回來,兩小時就跑完了,驚為天人,嚇死我了

Ray 的簡介

Ray 是一個開源的分散式計算框架,提供平行處理的計算層,特別適用於需要大量計算資源的情境。我們不需要很熟悉如何使用分散式系統,Ray 會直接幫我們執行分散式計算。適合用於各種常見的機器學習任務,例如資料處理、分散式訓練、hyperparameters 調整等等。

Uber、Netflix 和 Spotify 都有使用 Ray 來進行分散式的模型訓練,並利用 Ray Tune 進行超參數優化。由於 Ray 支持 PyTorch 和 TensorFlow 這兩種常用的機器學習框架,簡化從 local 開發轉換成分佈式計算環境的過程,提高訓練效率,也增快訓練速度。

我們主要會用到的稱為 Ray AI Libraries,這是提供給機器學習工程師、資料科學家和研究人員等人、可擴展、專為機器學習設計的函式庫。
這些函式庫建立在 Ray Core 之上,可以利用 Ray Core 提供的分散式運算能力。Ray Core 負責處理分散式系統,例如協調、排程、容錯和自動擴展。

Ray AI Libraries 包含五個 libraries,可以用於分散式處理各種的機器學習任務:

  • Ray Data:可以在訓練、調整和預測過程中存取和轉換資料,支援在多個節點和 GPU 上進行分散式處理大型資料集,並與 TensorFlow 和 PyTorch 整合。
  • Ray Train:支援分散式訓練,並與 PyTorch、TensorFlow 和 Hugging Face Transformers 整合。可以在多個節點和 GPU 上訓練大型模型,並提供用於監控訓練進度和管理檢查點的工具。
  • Ray Tune:這是一個可擴展的 hyperparameters 調整 librarary,可以與 Ray Train 和其他訓練的 libraries 整合,可以透過同時執行多個實驗來加速 hyperparameters 優化。
  • Ray Serve:用於部署模型,支援 batch 處理以提高效能,並且可以與 Ray Train 和 Ray Tune 整合。
  • Ray RLlib:可擴展的分散式強化學習(reinforcement learning)函式庫,為各種強化學習演算法提供支援,並且可以與各種模擬環境整合。

Ray 的概念大致介紹完畢,現在來直接使用看看吧!


Ray vs. Multithread

首先,請先安裝套件:pip install -U "ray[data]"

Ray 的主要功能是提供我們輕鬆的方式進行方散式計算,但你可能會先有一個疑惑,這不是開一個 multithread 就好了嗎?為什麼要用 Ray 呢?讓我們來看以下的程式碼比較。

import time
import threading
import ray

# 定義一個簡單的計算函數
def compute(x):
    time.sleep(5)  # 模擬耗時操作
    return x * x

# multithread 版本
def multithread_example():
    results = []
    threads = []
    for i in range(4):
        thread = threading.Thread(target=lambda: results.append(compute(i)))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    return results

# Ray版本
@ray.remote   # 使用者自己定義的 remote function,可以在 ray 節點上執行
def ray_compute(x):
    return compute(x)

def ray_example():
    ray.init()
    results = ray.get([ray_compute.remote(i) for i in range(4)])
    ray.shutdown()
    return results

# 執行方式
multithread_results = multithread_example()
ray_results = ray_example()

Ray 從 initialize 到結束,總共只花了三行,而 multithread 卻花了八行,顯示出 ray 的簡潔和易於使用。不過,程式碼也不是越少行一定就越好,ray 還提供很多其他方便的功能,讓我們來一一介紹使用方式吧!


Ray Data

Load Data

Ray 支援從各種來源載入資料,包含常見的檔案格式,如 parquet、圖片、txt、csv 等,也可以將 Python objects 轉成 ray 的 datasets 格式。用法如下:

以下的檔案位置都是 ray 官方的範例,是真實存在可以執行的哦!

讀常見的檔案格式

import ray

ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")
ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

把 Python Objects 轉成 ray 的 datasets

import ray

# 以下兩種形式都可以直接轉換
list_of_objects = [
    {"food": "spam", "price": 9.34},
    {"food": "ham", "price": 5.37},
]
list_of_objects = [1, 2, 3, 4, 5]

ds = ray.data.from_items(list_of_objects)

處理 numpy array

import numpy as np

array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)

處理 pandas DataFrame

import pandas as pd
import ray

df = pd.DataFrame({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)

Process Data

在讀取完資料之後,接下來可以對資料集做一些操作。

印出 dataset 的大小和 schema

print(f"Dataset size: {ds.count()} rows")
print(f"Dataset schema: {ds.schema()}")
print(ds)  # 也可以直接印出所有資訊

取某部分資料
Ray Data 會將每個 row 轉成 dictionary 的形式

rows = ds.take(1)
rows = ds.take_all() # 拿全部的 rows

將資料轉成 batch,並 iterate over batches

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

# numpy
for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    print(batch)

# pandas
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    print(batch)

直接處理資料集

import os
from typing import Any, Dict
import ray

# 這個前處理的 function 的輸入跟輸出都要是 dictionary 的格式哦!
def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

Inference on data

from typing import Dict

import numpy as np
from transformers import pipeline

import ray

# 定義一個文字分類器
class TextClassifier:
    def __init__(self):

        self.model = pipeline("text-classification")

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        predictions = self.model(list(batch["text"]))
        batch["label"] = [prediction["label"] for prediction in predictions]
        return batch

# 讀文字檔案,並送入模型
ds = (
    ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
    .map_batches(TextClassifier, concurrency=2)
)

以上就是 Ray 跟 Ray Data 的基本介紹,我自己試用之後覺得這個框架非常容易上手,也很顯著地加速我在處理資料的時間,推推!
他們的官網寫得非常清楚,有各式各樣的教學和用法,非常推薦大家都去玩玩看!

明天我們來看看 Ray 的另外一個 library——Ray Train 的用法吧!


謝謝讀到最後的你,如果喜歡這系列,別忘了按下喜歡和訂閱,才不會錯過最新更新。
如果有任何問題想跟我聊聊,或是想看我分享的其他內容,也歡迎到我的 Instagram(@data.scientist.min) 逛逛!
我們明天見!


Reference


上一篇
[Day 18] Netflix 影片處理三部曲—— Part 3. 標記音訊中的音樂和語音
下一篇
[Day 20] Ray - Netflix、Spotify 和 Uber 都在用的開源分散式計算框架,加速你的計算 - Part 2. 訓練模型
系列文
從點子構想到部署上線:機器學習專案的一生30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言