Building Your First ML Pipeline with Kubeflow Pipelines


Building Your First ML Pipeline with Kubeflow Pipelines


Kubeflow Pipelines를 사용하여 첫 번째 머신러닝(ML) 파이프라인을 생성하고 실행하는 것은 ML 워크플로우를 크게 간소화할 수 있습니다. Kubeflow Pipelines는 복잡한 ML 워크플로우의 오케스트레이션 및 관리를 단순화하도록 설계되어, 모델 개발 및 최적화에 집중할 수 있게 합니다. 이 튜토리얼은 Kubeflow Pipelines를 사용하여 첫 번째 ML 파이프라인을 구축하는 과정을 안내합니다.


Importance of ML Pipelines with Kubeflow


머신러닝 파이프라인은 ML 모델을 개발, 훈련 및 배포하는 엔드 투 엔드 프로세스를 자동화합니다. Kubeflow Pipelines를 활용하면 반복적인 작업을 자동화하고, 재현성을 보장하며, 리소스를 효율적으로 관리할 수 있습니다. 이는 개발 주기를 가속화할 뿐만 아니라 ML 워크플로우의 신뢰성과 확장성을 향상시킵니다.


Setting Up Your Environment


첫 번째 ML 파이프라인을 생성하기 전에 다음과 같은 전제 조건을 확인하십시오:


  • Kubernetes 클러스터 (버전 1.14 이상)
  • kubectl 명령줄 도구가 설치 및 구성됨
  • Kubernetes 클러스터에 Kubeflow 설치됨
  • 로컬 머신에 Python 3.7 이상 설치됨

Kubeflow를 아직 설치하지 않았다면, 공식 Kubeflow 문서를 참고하여 Kubernetes 클러스터에 설치하십시오.


Creating a Simple ML Pipeline


데이터 전처리, 모델 훈련, 모델 평가를 포함하는 간단한 파이프라인을 생성해 보겠습니다. Kubeflow Pipelines DSL(Domain Specific Language)을 사용하여 파이프라인을 정의합니다.


Step 1: Import Required Libraries


먼저 필요한 라이브러리를 임포트합니다:


import kfp
from kfp import dsl
from kfp.components import create_component_from_func

Step 2: Define Pipeline Components


다음으로, 파이프라인의 개별 구성 요소를 정의합니다. 각 구성 요소는 데이터 전처리, 모델 훈련 또는 평가와 같은 특정 작업을 수행하는 함수입니다. create_component_from_func 데코레이터를 사용하여 각 함수를 Kubeflow 파이프라인 구성 요소로 변환합니다.


def preprocess_data():
    import pandas as pd
    from sklearn.model_selection import train_test_split
    # Load dataset
    data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
    # Split dataset into training and testing sets
    train, test = train_test_split(data, test_size=0.2)
    # Save the processed data
    train.to_csv('/tmp/train_data.csv', index=False)
    test.to_csv('/tmp/test_data.csv', index=False)

preprocess_op = create_component_from_func(
    preprocess_data, 
    base_image='python:3.7', 
    output_component_file='preprocess_component.yaml'
)

def train_model():
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score
    # Load training data
    train = pd.read_csv('/tmp/train_data.csv')
    X_train = train.drop(columns=['species'])
    y_train = train['species']
    # Train model
    model = LogisticRegression(max_iter=200)
    model.fit(X_train, y_train)
    # Save the model
    import joblib
    joblib.dump(model, '/tmp/model.joblib')

train_op = create_component_from_func(
    train_model, 
    base_image='python:3.7-slim', 
    output_component_file='train_component.yaml'
)

def evaluate_model():
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score
    import joblib
    # Load testing data
    test = pd.read_csv('/tmp/test_data.csv')
    X_test = test.drop(columns=['species'])
    y_test = test['species']
    # Load the trained model
    model = joblib.load('/tmp/model.joblib')
    # Evaluate the model
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    print(f'Model accuracy: {accuracy}')

evaluate_op = create_component_from_func(
    evaluate_model, 
    base_image='python:3.7-slim', 
    output_component_file='evaluate_component.yaml'
)

Step 3: Define the Pipeline


@dsl.pipeline 데코레이터를 사용하여 파이프라인을 정의합니다. 이 파이프라인에서 구성 요소를 연결하여 올바른 실행 순서를 보장합니다.


@dsl.pipeline(
    name='Simple ML Pipeline',
    description='An example pipeline that performs data preprocessing, model training, and model evaluation.'
)
def simple_ml_pipeline():
    preprocess_task = preprocess_op()
    train_task = train_op().after(preprocess_task)
    evaluate_task = evaluate_op().after(train_task)

Step 4: Compile and Run the Pipeline


파이프라인을 컴파일하고 Kubeflow Pipelines에 업로드하여 실행합니다. 파이프라인 정의를 YAML 파일로 저장한 후, Kubeflow Pipelines 클라이언트를 사용하여 파이프라인을 생성하고 실행합니다.


if __name__ == '__main__':
    # Compile the pipeline
    import kfp.compiler as compiler
    compiler.Compiler().compile(simple_ml_pipeline, 'simple_ml_pipeline.yaml')
    
    # Upload and run the pipeline
    client = kfp.Client()
    experiment_name = 'First ML Pipeline Experiment'
    run_name = 'First ML Pipeline Run'
    client.create_run_from_pipeline_func(simple_ml_pipeline, arguments={}, experiment_name=experiment_name, run_name=run_name)

Monitoring and Managing Your Pipeline


파이프라인 실행 후 Kubeflow Pipelines UI를 통해 진행 상황을 모니터링합니다. UI는 파이프라인 단계, 상태, 로그 및 생성된 아티팩트에 대한 자세한 보기를 제공합니다. 이 시각화를 통해 ML 워크플로우의 흐름을 이해하고 발생하는 문제를 디버그할 수 있습니다.


Advanced Pipeline Features


Kubeflow Pipelines는 ML 워크플로우를 향상시키기 위한 고급 기능을 제공합니다:


  • Parameterization: 파이프라인에 매개변수를 전달하여 더 큰 유연성과 재사용성을 제공합니다.
  • Parallel Execution: 실행 시간을 최적화하기 위해 여러 단계를 병렬로 실행합니다.
  • Conditional Execution: 특정 조건에 따라 특정 단계를 실행하도록 정의합니다.

Example: Adding Parameters


파이프라인에 매개변수를 추가하는 방법의 예는 다음과 같습니다:


@dsl.pipeline(
    name='Parameterized ML Pipeline',
    description='A pipeline with parameters for flexibility.'
)
def parameterized_ml_pipeline(max_iter: int = 200, test_size: float = 0.2):
    preprocess_task = preprocess_op()
    train_task = train_op(max_iter=max_iter).after(preprocess_task)
    evaluate_task = evaluate_op(test_size=test_size).after(train_task)

if __name__ == '__main__':
    compiler.Compiler().compile(parameterized_ml_pipeline, 'parameterized_ml_pipeline.yaml')
    client = kfp.Client()
    experiment_name = 'Parameterized ML Pipeline Experiment'
    run_name = 'Parameterized ML Pipeline Run'
    arguments = {'max_iter': 300, 'test_size': 0.25}
    client.create_run_from_pipeline_func(parameterized_ml_pipeline, arguments=arguments, experiment_name=experiment_name, run_name=run_name)

Summary


Kubeflow Pipelines를 사용하여 첫 번째 ML 파이프라인을 구축하면 효율적이고 확장 가능한 ML 워크플로우의 기초를 마련할 수 있습니다. 재사용 가능한 구성 요소를 정의하고 체인으로 연결하여 복잡한 ML 작업을 자동화하고 관리할 수 있습니다. Kubeflow Pipelines의 유연성과 고급 기능은 워크플로우를 최적화하고 모델 개발 및 개선에 집중할 수 있게 합니다.


추가 자료 및 실용적인 예제를 위해 다음 리소스를 방문할 수 있습니다:


이 가이드를 따라 예제를 탐색하면 Kubeflow Pipelines를 마스터하고 머신러닝 워크플로우를 향상시킬 수 있습니다.

다음 이전