Building Your First ML Pipeline with Kubeflow Pipelines
Kubeflow Pipelines를 사용하여 첫 번째 머신러닝(ML) 파이프라인을 생성하고 실행하는 것은 ML 워크플로우를 크게 간소화할 수 있습니다. Kubeflow Pipelines는 복잡한 ML 워크플로우의 오케스트레이션 및 관리를 단순화하도록 설계되어, 모델 개발 및 최적화에 집중할 수 있게 합니다. 이 튜토리얼은 Kubeflow Pipelines를 사용하여 첫 번째 ML 파이프라인을 구축하는 과정을 안내합니다.
Importance of ML Pipelines with Kubeflow
머신러닝 파이프라인은 ML 모델을 개발, 훈련 및 배포하는 엔드 투 엔드 프로세스를 자동화합니다. Kubeflow Pipelines를 활용하면 반복적인 작업을 자동화하고, 재현성을 보장하며, 리소스를 효율적으로 관리할 수 있습니다. 이는 개발 주기를 가속화할 뿐만 아니라 ML 워크플로우의 신뢰성과 확장성을 향상시킵니다.
Setting Up Your Environment
첫 번째 ML 파이프라인을 생성하기 전에 다음과 같은 전제 조건을 확인하십시오:
- Kubernetes 클러스터 (버전 1.14 이상)
kubectl
명령줄 도구가 설치 및 구성됨- Kubernetes 클러스터에 Kubeflow 설치됨
- 로컬 머신에 Python 3.7 이상 설치됨
Kubeflow를 아직 설치하지 않았다면, 공식 Kubeflow 문서를 참고하여 Kubernetes 클러스터에 설치하십시오.
Creating a Simple ML Pipeline
데이터 전처리, 모델 훈련, 모델 평가를 포함하는 간단한 파이프라인을 생성해 보겠습니다. Kubeflow Pipelines DSL(Domain Specific Language)을 사용하여 파이프라인을 정의합니다.
Step 1: Import Required Libraries
먼저 필요한 라이브러리를 임포트합니다:
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
Step 2: Define Pipeline Components
다음으로, 파이프라인의 개별 구성 요소를 정의합니다. 각 구성 요소는 데이터 전처리, 모델 훈련 또는 평가와 같은 특정 작업을 수행하는 함수입니다. create_component_from_func
데코레이터를 사용하여 각 함수를 Kubeflow 파이프라인 구성 요소로 변환합니다.
def preprocess_data():
import pandas as pd
from sklearn.model_selection import train_test_split
# Load dataset
data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
# Split dataset into training and testing sets
train, test = train_test_split(data, test_size=0.2)
# Save the processed data
train.to_csv('/tmp/train_data.csv', index=False)
test.to_csv('/tmp/test_data.csv', index=False)
preprocess_op = create_component_from_func(
preprocess_data,
base_image='python:3.7',
output_component_file='preprocess_component.yaml'
)
def train_model():
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
# Load training data
train = pd.read_csv('/tmp/train_data.csv')
X_train = train.drop(columns=['species'])
y_train = train['species']
# Train model
model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)
# Save the model
import joblib
joblib.dump(model, '/tmp/model.joblib')
train_op = create_component_from_func(
train_model,
base_image='python:3.7-slim',
output_component_file='train_component.yaml'
)
def evaluate_model():
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
# Load testing data
test = pd.read_csv('/tmp/test_data.csv')
X_test = test.drop(columns=['species'])
y_test = test['species']
# Load the trained model
model = joblib.load('/tmp/model.joblib')
# Evaluate the model
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f'Model accuracy: {accuracy}')
evaluate_op = create_component_from_func(
evaluate_model,
base_image='python:3.7-slim',
output_component_file='evaluate_component.yaml'
)
Step 3: Define the Pipeline
@dsl.pipeline
데코레이터를 사용하여 파이프라인을 정의합니다. 이 파이프라인에서 구성 요소를 연결하여 올바른 실행 순서를 보장합니다.
@dsl.pipeline(
name='Simple ML Pipeline',
description='An example pipeline that performs data preprocessing, model training, and model evaluation.'
)
def simple_ml_pipeline():
preprocess_task = preprocess_op()
train_task = train_op().after(preprocess_task)
evaluate_task = evaluate_op().after(train_task)
Step 4: Compile and Run the Pipeline
파이프라인을 컴파일하고 Kubeflow Pipelines에 업로드하여 실행합니다. 파이프라인 정의를 YAML 파일로 저장한 후, Kubeflow Pipelines 클라이언트를 사용하여 파이프라인을 생성하고 실행합니다.
if __name__ == '__main__':
# Compile the pipeline
import kfp.compiler as compiler
compiler.Compiler().compile(simple_ml_pipeline, 'simple_ml_pipeline.yaml')
# Upload and run the pipeline
client = kfp.Client()
experiment_name = 'First ML Pipeline Experiment'
run_name = 'First ML Pipeline Run'
client.create_run_from_pipeline_func(simple_ml_pipeline, arguments={}, experiment_name=experiment_name, run_name=run_name)
Monitoring and Managing Your Pipeline
파이프라인 실행 후 Kubeflow Pipelines UI를 통해 진행 상황을 모니터링합니다. UI는 파이프라인 단계, 상태, 로그 및 생성된 아티팩트에 대한 자세한 보기를 제공합니다. 이 시각화를 통해 ML 워크플로우의 흐름을 이해하고 발생하는 문제를 디버그할 수 있습니다.
Advanced Pipeline Features
Kubeflow Pipelines는 ML 워크플로우를 향상시키기 위한 고급 기능을 제공합니다:
- Parameterization: 파이프라인에 매개변수를 전달하여 더 큰 유연성과 재사용성을 제공합니다.
- Parallel Execution: 실행 시간을 최적화하기 위해 여러 단계를 병렬로 실행합니다.
- Conditional Execution: 특정 조건에 따라 특정 단계를 실행하도록 정의합니다.
Example: Adding Parameters
파이프라인에 매개변수를 추가하는 방법의 예는 다음과 같습니다:
@dsl.pipeline(
name='Parameterized ML Pipeline',
description='A pipeline with parameters for flexibility.'
)
def parameterized_ml_pipeline(max_iter: int = 200, test_size: float = 0.2):
preprocess_task = preprocess_op()
train_task = train_op(max_iter=max_iter).after(preprocess_task)
evaluate_task = evaluate_op(test_size=test_size).after(train_task)
if __name__ == '__main__':
compiler.Compiler().compile(parameterized_ml_pipeline, 'parameterized_ml_pipeline.yaml')
client = kfp.Client()
experiment_name = 'Parameterized ML Pipeline Experiment'
run_name = 'Parameterized ML Pipeline Run'
arguments = {'max_iter': 300, 'test_size': 0.25}
client.create_run_from_pipeline_func(parameterized_ml_pipeline, arguments=arguments, experiment_name=experiment_name, run_name=run_name)
Summary
Kubeflow Pipelines를 사용하여 첫 번째 ML 파이프라인을 구축하면 효율적이고 확장 가능한 ML 워크플로우의 기초를 마련할 수 있습니다. 재사용 가능한 구성 요소를 정의하고 체인으로 연결하여 복잡한 ML 작업을 자동화하고 관리할 수 있습니다. Kubeflow Pipelines의 유연성과 고급 기능은 워크플로우를 최적화하고 모델 개발 및 개선에 집중할 수 있게 합니다.
추가 자료 및 실용적인 예제를 위해 다음 리소스를 방문할 수 있습니다:
- Kubeflow Documentation
- Running Distributed TensorFlow Jobs on Kubeflow
- Automating Hyperparameter Tuning with Katib
이 가이드를 따라 예제를 탐색하면 Kubeflow Pipelines를 마스터하고 머신러닝 워크플로우를 향상시킬 수 있습니다.