iT邦幫忙

2023 iThome 鐵人賽

DAY 29
0
AI & Data

MLOps/LLMOps - 從零開始系列 第 29

Day29 - Ray on Databricks

  • 分享至 

  • xImage
  •  

從 Ray 2.3.0 之後,開始支援 Apache Spark Cluster,也讓原本使用 Spark 進行分散式機器學習訓練的工作,可以透過 Ray 來取代 Spark。

Requirements

  • Databricks Runtime 12.0 ML and above.
  • Databricks Runtime cluster access mode must be either “assigned” mode or “no isolation shared” mode.

建議的 Ray worker node 設定

  • Minimum 4 CPU cores per Ray worker node.
  • Minimum 10GB heap memory for each Ray worker node.

在 Databricks 上使用 Ray

%pip install ray[default]>=2.3.0

Create a Ray cluster

ray.util.spark.setup_ray_cluster 這個 API 將會在 Spark 上建立 Ray cluster。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

Run a Ray application

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Load data from Spark DataFrame

透過 Spark DataFrame 來讀取資料,並將資料轉換成 Ray dataset。

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Shutdown Ray cluster

  • ray.utils.spark.shutdown_ray_cluster API.
  • detach notebook from Databricks cluster
  • Databricks job completes
  • Databricks cluster restarted or terminated
  • 超過 cluster 的 idle time (如果有指定的話)

Reference:


上一篇
Day28 - MLOps deployment patterns
下一篇
Day30 - Summary: 寫在 "MLOps/LLMOps - 從零開始" 完賽後的心得
系列文
MLOps/LLMOps - 從零開始30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言