Building and Deploying a Recommender System on Kubeflow with KServe
In this tutorial, we'll walk through the process of developing a basic recommender system and deploying it on a Kubernetes cluster using Kubeflow and KServe (formerly KFServing). Kubeflow provides an end-to-end platform for machine learning workflows, and KServe simplifies model serving with powerful features like autoscaling and multi-framework support.
What You'll Learn
- Understanding the core components: Kubeflow, KServe.
- Training a simple collaborative filtering recommender model.
- Containerizing your model training and serving code.
- Defining a Kubeflow Pipeline for MLOps (optional but recommended).
- Deploying your trained model using KServe's
InferenceService
. - Sending inference requests to your deployed model.
Prerequisites
Before you begin, ensure you have the following:
- A running Kubernetes cluster (e.g., Minikube, Kind, GKE, EKS, AKS).
- Kubeflow installed on your Kubernetes cluster. If not, refer to the official Kubeflow installation guide. Ensure KServe is part of your installation.
kubectl
installed and configured to connect to your cluster.- Docker installed for building and pushing container images.
- Python 3.8+ and
pip
.
1. Understanding the Core Components
Kubeflow
Kubeflow is a powerful open-source ML platform for Kubernetes. It provides various components to manage the entire ML lifecycle, from data preparation and model training to deployment and monitoring. Key components relevant here include:
- Kubeflow Pipelines (KFP): For defining and orchestrating end-to-end ML workflows.
- Kubeflow Notebooks: For interactive development environments (Jupyter notebooks).
KServe (formerly KFServing)
KServe is the standard model serving platform on Kubeflow. It's built on top of Knative Serving and Istio, offering:
- Serverless Inference: Scales models up and down based on traffic, even to zero.
- Multi-Framework Support: Native support for popular ML frameworks like TensorFlow, PyTorch, Scikit-learn, XGBoost, etc.
- Advanced Deployments: Canary rollouts, A/B testing, and autoscaling.
2. Building a Simple Recommender System
For this tutorial, we'll implement a basic collaborative filtering recommender using the implicit
library in Python. This model will learn user and item factors from interaction data.
Step 2.1: Prepare Your Training Data
Create a CSV file named your_interaction_data.csv
with at least three columns: user_id
, item_id
, and rating
. For simplicity, here's a small example dataset:
user_id,item_id,rating
1,101,5
1,102,4
2,101,3
2,103,5
3,102,4
3,103,3
Step 2.2: Model Training Code
Create a Python file named recommender_model.py
:
# recommender_model.py
import pandas as pd
from scipy.sparse import csr_matrix
from implicit.als import AlternatingLeastSquares
import pickle
import os
def train_recommender(data_path, output_model_dir):
"""
Loads data, trains an ALS recommender model, and saves the model
along with user/item mappings.
"""
print(f"Loading data from {data_path}...")
df = pd.read_csv(data_path)
# Create user-item matrix
user_ids = df['user_id'].astype("category").cat.codes
item_ids = df['item_id'].astype("category").cat.codes
ratings = df['rating'].values
# Store mappings
user_to_id = {user: uid for user, uid in zip(df['user_id'], user_ids)}
item_to_id = {item: iid for item, iid in zip(df['item_id'], item_ids)}
# Create sparse matrix for training
# For implicit, the matrix should be (items x users) or (users x items)
# Let's create a (users x items) matrix
user_item_matrix = csr_matrix((ratings, (user_ids, item_ids)),
shape=(len(user_to_id), len(item_to_id)))
# Initialize and train the ALS model
print("Training ALS model...")
model = AlternatingLeastSquares(factors=50, regularization=0.01, iterations=20, random_state=42)
model.fit(user_item_matrix) # For implicit, this expects (users x items) or (items x users)
# Ensure output directory exists
os.makedirs(output_model_dir, exist_ok=True)
# Save the model and mappings
with open(os.path.join(output_model_dir, 'recommender_model.pkl'), 'wb') as f:
pickle.dump(model, f)
with open(os.path.join(output_model_dir, 'user_to_id.pkl'), 'wb') as f:
pickle.dump(user_to_id, f)
with open(os.path.join(output_model_dir, 'item_to_id.pkl'), 'wb') as f:
pickle.dump(item_to_id, f)
print(f"Model and mappings saved to {output_model_dir}")
if __name__ == "__main__":
# In a real scenario, you'd get data_path from arguments or environment variables
# And output_model_dir would likely be a cloud storage path (e.g., S3, GCS, MinIO)
# For local testing:
train_recommender("your_interaction_data.csv", "trained_model_artifacts")
Step 2.3: Containerize Your Training Code
Create a Dockerfile
to build an image for training your model:
# Dockerfile for training
FROM python:3.9-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY recommender_model.py .
COPY your_interaction_data.csv . # Copy your data for local testing/initial build
ENTRYPOINT ["python", "recommender_model.py"]
And your requirements.txt
for training:
pandas
scipy
implicit
Build and push this Docker image to your Docker registry (e.g., Docker Hub, Google Container Registry, etc.). Replace your-docker-registry
with your actual registry path.
docker build -t your-docker-registry/recommender-trainer:latest .
docker push your-docker-registry/recommender-trainer:latest
3. Orchestrating with Kubeflow Pipelines (Optional but Recommended)
Kubeflow Pipelines help you automate your ML workflow. We'll define a simple pipeline that prepares data and trains the model. The trained model artifacts will be stored in Kubeflow's artifact store (typically MinIO).
Step 3.1: Define Your Pipeline
Create a Python file, e.g., recommender_pipeline.py
:
# recommender_pipeline.py
from kfp import dsl
from kfp.dsl import component, OutputPath, InputPath
import os
# Define a component for data preparation
@component(base_image="python:3.9-slim-buster", packages_to_install=["pandas"])
def prepare_data(data_url: str, processed_data_path: OutputPath(str)):
"""
Simulates data preparation and saves it to an output path.
In a real scenario, this would involve complex ETL from a data source.
"""
import pandas as pd
print(f"Downloading and processing data from {data_url}...")
# Using the local CSV for demonstration
df = pd.read_csv("your_interaction_data.csv") # Assuming it's accessible or downloaded here
df.to_csv(processed_data_path, index=False)
print(f"Data prepared and saved to {processed_data_path}")
# Define a component for model training using our custom Docker image
@component(base_image="your-docker-registry/recommender-trainer:latest")
def train_model(
data_path: InputPath(str),
model_dir_path: OutputPath(str) # KFP will provide a directory for artifacts
):
"""
Trains the recommender model using the prepared data.
The model and mappings will be saved to model_dir_path.
"""
print(f"Training model with data from {data_path}")
print(f"Output model directory: {model_dir_path}")
# The recommender_model.py expects data_path and an output_model_dir
# KFP mounts input/output paths as files/directories.
# We copy the input data to a well-known path within the container if needed.
# For this example, we assume recommender_model.py can directly read from data_path
# and write to model_dir_path.
os.system(f"cp {data_path} /app/input_data.csv") # Copy data to where our script expects it
os.system(f"python /app/recommender_model.py /app/input_data.csv {model_dir_path}")
print("Model training complete.")
# Define your Kubeflow Pipeline
@dsl.pipeline(
name='simple-recommender-pipeline',
description='An end-to-end recommender system training pipeline.'
)
def recommender_pipeline(data_url: str = 'local_data.csv'):
"""
Main pipeline definition.
"""
prepare_data_task = prepare_data(data_url=data_url)
train_model_task = train_model(
data_path=prepare_data_task.outputs['processed_data_path'],
model_dir_path=dsl.PipelineParameter(name='model_artifacts_path', default='/tmp/model_artifacts')
)
# The model_dir_path from train_model_task will contain the serialized model and mappings.
# KFP automatically handles uploading these artifacts to its artifact store (e.g., MinIO).
# You will get a URI for these artifacts which you'll use for KServe.
print(f"Model artifacts available at: {train_model_task.outputs['model_dir_path']}")
# To compile and run this pipeline (typically done from a separate script or CLI)
# from kfp import compiler
# compiler.Compiler().compile(recommender_pipeline, 'recommender_pipeline.yaml')
# print("Pipeline compiled to recommender_pipeline.yaml")
#
# Then, use the KFP client to submit the run:
# from kfp import Client
# client = Client()
# run = client.create_run_from_pipeline_package(
# pipeline_file='recommender_pipeline.yaml',
# arguments={'data_url': 'local_data.csv'}, # Pass arguments if any
# run_name='my-recommender-run-1'
# )
# print(f"Pipeline run created: {run.run_id}")
Step 3.2: Compile and Run the Pipeline
As clarified previously, you compile and run the pipeline separately. Save the above Python code as recommender_pipeline.py
.
Option 1: Compile from the command line:
pip install kfp
python -c "from recommender_pipeline import recommender_pipeline; from kfp import compiler; compiler.Compiler().compile(recommender_pipeline, 'recommender_pipeline.yaml')"
Option 2: Run directly from the Kubeflow UI:
- Navigate to the Kubeflow Pipelines dashboard in your browser.
- Click on "Upload Pipeline" and upload the
recommender_pipeline.py
file (it will compile on the fly). - Click on "Create Run", select your pipeline, and give it a name.
- Once the run completes, inspect the outputs of the
train-model
step. You'll see a link or path to the stored model artifacts (e.g.,minio://mlpipeline/artifacts/...
). Copy this URI; you'll need it for KServe.
your_interaction_data.csv
is accessible within the container for the prepare_data
step. For a real pipeline, you'd likely fetch data from a persistent storage. For this local example, you might need to build the prepare_data
component's image with the CSV embedded, or ensure it's mounted. For simplicity, the train_model
component's Dockerfile now also copies the data, but for a real pipeline, the data would flow from one step's output to the next's input.
4. Deploying with KServe
Now that our model is trained and its artifacts are stored, we can use KServe to deploy it as a scalable inference endpoint.
Step 4.1: Create Your KServe Predictor
KServe allows you to use pre-built servers for common frameworks or provide a custom predictor. Since we used the implicit
library, we'll create a custom predictor. Create a file named predictor.py
:
# predictor.py
import kserve
import pickle
import pandas as pd
from scipy.sparse import csr_matrix
import os
class RecommenderModel(kserve.Model):
"""
KServe custom model class for our implicit recommender.
"""
def __init__(self, name: str):
super().__init__(name)
self.name = name
self.ready = False
self.model = None
self.user_to_id = None
self.item_to_id = None
self.id_to_user = None
self.id_to_item = None
def load(self):
"""
Loads the trained model and mappings from the MODEL_URI.
KServe automatically downloads the contents of MODEL_URI to a local path.
"""
# KServe sets the MODEL_URI environment variable to the local path of the downloaded model artifacts
model_path = os.environ.get("MODEL_URI")
if not model_path:
raise ValueError("MODEL_URI environment variable not set.")
print(f"Loading model artifacts from: {model_path}")
with open(os.path.join(model_path, 'recommender_model.pkl'), 'rb') as f:
self.model = pickle.load(f)
with open(os.path.join(model_path, 'user_to_id.pkl'), 'rb') as f:
self.user_to_id = pickle.load(f)
with open(os.path.join(model_path, 'item_to_id.pkl'), 'rb') as f:
self.item_to_id = pickle.load(f)
# Create inverse mappings for recommendations
self.id_to_user = {v: k for k, v in self.user_to_id.items()}
self.id_to_item = {v: k for k, v in self.item_to_id.items()}
self.ready = True
print("Model loaded successfully!")
def predict(self, payload: dict, headers: dict = None) -> dict:
"""
Handles inference requests. Expects a 'user_id' in the payload.
"""
# KServe expects a 'instances' key in the payload for prediction requests.
if "instances" not in payload:
raise ValueError("Expected 'instances' in payload.")
if not isinstance(payload["instances"], list) or not payload["instances"]:
raise ValueError("Expected 'instances' to be a non-empty list.")
# Assuming a single user_id for simplicity for now
request_data = payload["instances"][0]
if "user_id" not in request_data:
raise ValueError("Expected 'user_id' in instance data.")
user_id = request_data["user_id"]
print(f"Received prediction request for user_id: {user_id}")
# Convert external user_id to internal model ID
if user_id not in self.user_to_id:
print(f"User ID {user_id} not found in mappings.")
return {"predictions": []} # User not found or no interactions
internal_user_id = self.user_to_id[user_id]
# Get recommendations from the implicit ALS model
# `recommend` expects a user_id, user_items (known items for this user), and N (number of recs)
# For simplicity, we assume no known items for the user during prediction
# (meaning the model can recommend anything).
# In a real system, you'd provide the items the user has already interacted with.
recommendations = self.model.recommend(
internal_user_id,
csr_matrix(self.model.user_factors.shape), # Placeholder for user_items
N=request_data.get("num_recommendations", 10) # Get top N recommendations
)
recommended_items = []
for item_id_internal, score in recommendations:
if item_id_internal in self.id_to_item:
recommended_items.append({
"item_id": self.id_to_item[item_id_internal],
"score": float(score)
})
return {"predictions": recommended_items}
if __name__ == "__main__":
# Start the KServe model server
# The name here ('recommender-model') must match the model_name in the InferenceService.
# MODEL_URI is expected to be set by KServe.
model = RecommenderModel("recommender-model")
kserve.ModelServer().start([model])
```
Step 4.2: Containerize Your KServe Predictor
Create a Dockerfile
for your KServe serving image:
# Dockerfile for serving with KServe
FROM python:3.9-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY predictor.py .
# The CMD here tells KServe how to run your custom model.
# MODEL_NAME and MODEL_URI are environment variables that KServe will set.
CMD ["python", "-m", "kserve", "--model_name", "recommender-model", "--model_uri", "/mnt/models"]
And your requirements.txt
for serving:
kserve
pandas
scipy
implicit
Build and push this Docker image:
docker build -t your-docker-registry/recommender-server:latest .
docker push your-docker-registry/recommender-server:latest
Step 4.3: Define KServe InferenceService
YAML
Create a Kubernetes YAML file named recommender-service.yaml
. This defines your KServe deployment.
your-docker-registry/recommender-server:latest
with your pushed image. Also, **replace the value
for MODEL_URI
** with the actual URI where your trained model artifacts are stored (obtained from your Kubeflow Pipeline run, e.g., s3://your-s3-or-minio-bucket/models/recommender/
).
If you're using MinIO (common with Kubeflow), the URI might look like:
s3://mlpipeline/artifacts/recommender-pipeline/<run-id>/train-model/model_artifacts/
.
You can find this in the Kubeflow Pipelines UI under the "Artifacts" tab of your "train-model" step after a successful run.
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: recommender-service
namespace: kubeflow-user-example-com # Or your Kubeflow user namespace
spec:
predictor:
# serviceAccountName: default-editor # Use this if your service account needs specific permissions
container:
image: your-docker-registry/recommender-server:latest # Your KServe serving image
env:
- name: MODEL_URI
value: "s3://mlpipeline/artifacts/recommender-pipeline/<your-run-id>/train-model/model_artifacts/" # <-- UPDATE THIS WITH YOUR MODEL URI
- name: STORAGE_URI # KServe might use this for MinIO config
value: "s3://mlpipeline/artifacts/recommender-pipeline/<your-run-id>/train-model/model_artifacts/" # <-- UPDATE THIS
- name: S3_ENDPOINT_URL # For MinIO, specify the internal MinIO service endpoint
value: "[http://minio-service.kubeflow:9000](http://minio-service.kubeflow:9000)" # Common MinIO endpoint in Kubeflow
- name: S3_ACCESS_KEY_ID # Your MinIO access key (or env var if configured)
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact # Default MinIO secret name
key: accesskey
- name: S3_SECRET_ACCESS_KEY # Your MinIO secret key
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact # Default MinIO secret name
key: secretkey
ports:
- containerPort: 8080 # Default KServe port
name: h2c # KServe uses HTTP/2
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 1
memory: 1Gi
Step 4.4: Deploy the Inference Service
Apply the YAML file to your Kubernetes cluster:
kubectl apply -f recommender-service.yaml
Monitor the status of your InferenceService. It might take a few minutes for the pods to spin up and become ready.
kubectl get inferenceservice recommender-service -n kubeflow-user-example-com
Look for the URL
in the output. This is the endpoint where your model will serve predictions.
NAME URL READY PREDICTOR_READY EXPLAINER_READY RAW_READY AGE
recommender-service [http://recommender-service.kubeflow](http://recommender-service.kubeflow)... True True False False 2m
5. Sending Inference Requests
Once your KServe InferenceService
is ready, you can send prediction requests to it.
Step 5.1: Get the Ingress Gateway Address
KServe typically routes traffic through Istio's ingress gateway. You need to get its IP address or hostname:
# Get the external IP of the Istio Ingress Gateway
# This might be an IP or a hostname depending on your cloud provider/setup
export INGRESS_HOST=\(kubectl \-n istio\-system get service istio\-ingressgateway \-o jsonpath\='\{\.status\.loadBalancer\.ingress\[0\]\.ip\}'\)
export INGRESS\_PORT\=(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}')
# Get the hostname for your KServe service
export SERVICE_HOSTNAME=$(kubectl get inferenceservice recommender-service -n kubeflow-user-example-com -o jsonpath='{.status.url}' | cut -d "/" -f 3)
echo "Ingress Host: ${INGRESS_HOST}"
echo "Ingress Port: ${INGRESS_PORT}"
echo "Service Hostname: ${SERVICE_HOSTNAME}"
Step 5.2: Make a Prediction Request
Use curl
to send a JSON request to your model endpoint. We'll ask for recommendations for user_id: 1
.
curl -v \
-H "Host: \{SERVICE\_HOSTNAME\}" \\
\-H "Content\-Type\: application/json" \\
"http\://{INGRESS_HOST}:${INGRESS_PORT}/v1/models/recommender-service:predict" \
-d '{ "instances": [ { "user_id": 1 } ] }'
You should receive a JSON response similar to this, containing recommendations for the specified user ID:
{
"predictions": [
{
"item_id": 103,
"score": 0.56789
},
{
"item_id": 104,
"score": 0.45678
},
...
]
}
Troubleshooting Tips
- KServe not Ready: Check
kubectl describe inferenceservice recommender-service -n kubeflow-user-example-com
for events and conditions. - Pod Errors: Check pod logs:
kubectl logs <your-kserve-pod-name> -n kubeflow-user-example-com
. - Incorrect MODEL_URI: Ensure your
MODEL_URI
in theInferenceService
YAML points to the exact location of your model artifacts (the directory containingrecommender_model.pkl
,user_to_id.pkl
,item_to_id.pkl
). - MinIO Permissions: Double-check that your KServe deployment has the necessary permissions to read from your MinIO bucket. The provided YAML uses the default `mlpipeline-minio-artifact` secret, which usually has the correct access.
- Networking Issues: If you cannot reach the Istio Ingress Gateway, check your Kubernetes cluster's network configuration and firewall rules.
- Payload Format: Ensure your inference request payload matches the format expected by your
predictor.py
(i.e.,{"instances": [{"user_id": X}]}
).
Conclusion
Congratulations! You've successfully built a simple recommender system, containerized its training and serving components, optionally orchestrated it with Kubeflow Pipelines, and deployed it on Kubeflow using KServe. This setup provides a powerful and scalable foundation for your ML inference workloads.
From here, you can explore more advanced topics:
- Implementing more sophisticated recommender algorithms (e.g., deep learning models).
- Integrating with external data sources for real-time recommendations.
- Setting up continuous integration/continuous deployment (CI/CD) for your ML models.
- Exploring KServe's advanced features like canary rollouts, A/B testing, and explainability.
- Monitoring your model's performance and drift.
Happy recommending!
Comments
Post a Comment