Building and Deploying a Recommender System on Kubeflow with KServe

Building and Deploying a Recommender System on Kubeflow with KServe

In this tutorial, we'll walk through the process of developing a basic recommender system and deploying it on a Kubernetes cluster using Kubeflow and KServe (formerly KFServing). Kubeflow provides an end-to-end platform for machine learning workflows, and KServe simplifies model serving with powerful features like autoscaling and multi-framework support.

What You'll Learn

  • Understanding the core components: Kubeflow, KServe.
  • Training a simple collaborative filtering recommender model.
  • Containerizing your model training and serving code.
  • Defining a Kubeflow Pipeline for MLOps (optional but recommended).
  • Deploying your trained model using KServe's InferenceService.
  • Sending inference requests to your deployed model.

Prerequisites

Before you begin, ensure you have the following:

  • A running Kubernetes cluster (e.g., Minikube, Kind, GKE, EKS, AKS).
  • Kubeflow installed on your Kubernetes cluster. If not, refer to the official Kubeflow installation guide. Ensure KServe is part of your installation.
  • kubectl installed and configured to connect to your cluster.
  • Docker installed for building and pushing container images.
  • Python 3.8+ and pip.

1. Understanding the Core Components

Kubeflow

Kubeflow is a powerful open-source ML platform for Kubernetes. It provides various components to manage the entire ML lifecycle, from data preparation and model training to deployment and monitoring. Key components relevant here include:

  • Kubeflow Pipelines (KFP): For defining and orchestrating end-to-end ML workflows.
  • Kubeflow Notebooks: For interactive development environments (Jupyter notebooks).

KServe (formerly KFServing)

KServe is the standard model serving platform on Kubeflow. It's built on top of Knative Serving and Istio, offering:

  • Serverless Inference: Scales models up and down based on traffic, even to zero.
  • Multi-Framework Support: Native support for popular ML frameworks like TensorFlow, PyTorch, Scikit-learn, XGBoost, etc.
  • Advanced Deployments: Canary rollouts, A/B testing, and autoscaling.

2. Building a Simple Recommender System

For this tutorial, we'll implement a basic collaborative filtering recommender using the implicit library in Python. This model will learn user and item factors from interaction data.

Step 2.1: Prepare Your Training Data

Create a CSV file named your_interaction_data.csv with at least three columns: user_id, item_id, and rating. For simplicity, here's a small example dataset:

user_id,item_id,rating
1,101,5
1,102,4
2,101,3
2,103,5
3,102,4
3,103,3
Note: For real-world recommenders, your dataset will be much larger and more complex.

Step 2.2: Model Training Code

Create a Python file named recommender_model.py:


# recommender_model.py
import pandas as pd
from scipy.sparse import csr_matrix
from implicit.als import AlternatingLeastSquares
import pickle
import os

def train_recommender(data_path, output_model_dir):
    """
    Loads data, trains an ALS recommender model, and saves the model
    along with user/item mappings.
    """
    print(f"Loading data from {data_path}...")
    df = pd.read_csv(data_path)

    # Create user-item matrix
    user_ids = df['user_id'].astype("category").cat.codes
    item_ids = df['item_id'].astype("category").cat.codes
    ratings = df['rating'].values

    # Store mappings
    user_to_id = {user: uid for user, uid in zip(df['user_id'], user_ids)}
    item_to_id = {item: iid for item, iid in zip(df['item_id'], item_ids)}

    # Create sparse matrix for training
    # For implicit, the matrix should be (items x users) or (users x items)
    # Let's create a (users x items) matrix
    user_item_matrix = csr_matrix((ratings, (user_ids, item_ids)),
                                  shape=(len(user_to_id), len(item_to_id)))

    # Initialize and train the ALS model
    print("Training ALS model...")
    model = AlternatingLeastSquares(factors=50, regularization=0.01, iterations=20, random_state=42)
    model.fit(user_item_matrix) # For implicit, this expects (users x items) or (items x users)

    # Ensure output directory exists
    os.makedirs(output_model_dir, exist_ok=True)

    # Save the model and mappings
    with open(os.path.join(output_model_dir, 'recommender_model.pkl'), 'wb') as f:
        pickle.dump(model, f)
    with open(os.path.join(output_model_dir, 'user_to_id.pkl'), 'wb') as f:
        pickle.dump(user_to_id, f)
    with open(os.path.join(output_model_dir, 'item_to_id.pkl'), 'wb') as f:
        pickle.dump(item_to_id, f)

    print(f"Model and mappings saved to {output_model_dir}")

if __name__ == "__main__":
    # In a real scenario, you'd get data_path from arguments or environment variables
    # And output_model_dir would likely be a cloud storage path (e.g., S3, GCS, MinIO)
    # For local testing:
    train_recommender("your_interaction_data.csv", "trained_model_artifacts")

Step 2.3: Containerize Your Training Code

Create a Dockerfile to build an image for training your model:


# Dockerfile for training
FROM python:3.9-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY recommender_model.py .
COPY your_interaction_data.csv . # Copy your data for local testing/initial build
ENTRYPOINT ["python", "recommender_model.py"]

And your requirements.txt for training:


pandas
scipy
implicit

Build and push this Docker image to your Docker registry (e.g., Docker Hub, Google Container Registry, etc.). Replace your-docker-registry with your actual registry path.


docker build -t your-docker-registry/recommender-trainer:latest .
docker push your-docker-registry/recommender-trainer:latest

3. Orchestrating with Kubeflow Pipelines (Optional but Recommended)

Kubeflow Pipelines help you automate your ML workflow. We'll define a simple pipeline that prepares data and trains the model. The trained model artifacts will be stored in Kubeflow's artifact store (typically MinIO).

Step 3.1: Define Your Pipeline

Create a Python file, e.g., recommender_pipeline.py:


# recommender_pipeline.py
from kfp import dsl
from kfp.dsl import component, OutputPath, InputPath
import os

# Define a component for data preparation
@component(base_image="python:3.9-slim-buster", packages_to_install=["pandas"])
def prepare_data(data_url: str, processed_data_path: OutputPath(str)):
    """
    Simulates data preparation and saves it to an output path.
    In a real scenario, this would involve complex ETL from a data source.
    """
    import pandas as pd
    print(f"Downloading and processing data from {data_url}...")
    # Using the local CSV for demonstration
    df = pd.read_csv("your_interaction_data.csv") # Assuming it's accessible or downloaded here
    df.to_csv(processed_data_path, index=False)
    print(f"Data prepared and saved to {processed_data_path}")

# Define a component for model training using our custom Docker image
@component(base_image="your-docker-registry/recommender-trainer:latest")
def train_model(
    data_path: InputPath(str),
    model_dir_path: OutputPath(str) # KFP will provide a directory for artifacts
):
    """
    Trains the recommender model using the prepared data.
    The model and mappings will be saved to model_dir_path.
    """
    print(f"Training model with data from {data_path}")
    print(f"Output model directory: {model_dir_path}")
    # The recommender_model.py expects data_path and an output_model_dir
    # KFP mounts input/output paths as files/directories.
    # We copy the input data to a well-known path within the container if needed.
    # For this example, we assume recommender_model.py can directly read from data_path
    # and write to model_dir_path.
    os.system(f"cp {data_path} /app/input_data.csv") # Copy data to where our script expects it
    os.system(f"python /app/recommender_model.py /app/input_data.csv {model_dir_path}")
    print("Model training complete.")

# Define your Kubeflow Pipeline
@dsl.pipeline(
    name='simple-recommender-pipeline',
    description='An end-to-end recommender system training pipeline.'
)
def recommender_pipeline(data_url: str = 'local_data.csv'):
    """
    Main pipeline definition.
    """
    prepare_data_task = prepare_data(data_url=data_url)
    train_model_task = train_model(
        data_path=prepare_data_task.outputs['processed_data_path'],
        model_dir_path=dsl.PipelineParameter(name='model_artifacts_path', default='/tmp/model_artifacts')
    )
    # The model_dir_path from train_model_task will contain the serialized model and mappings.
    # KFP automatically handles uploading these artifacts to its artifact store (e.g., MinIO).
    # You will get a URI for these artifacts which you'll use for KServe.
    print(f"Model artifacts available at: {train_model_task.outputs['model_dir_path']}")

# To compile and run this pipeline (typically done from a separate script or CLI)
# from kfp import compiler
# compiler.Compiler().compile(recommender_pipeline, 'recommender_pipeline.yaml')
# print("Pipeline compiled to recommender_pipeline.yaml")
#
# Then, use the KFP client to submit the run:
# from kfp import Client
# client = Client()
# run = client.create_run_from_pipeline_package(
#     pipeline_file='recommender_pipeline.yaml',
#     arguments={'data_url': 'local_data.csv'}, # Pass arguments if any
#     run_name='my-recommender-run-1'
# )
# print(f"Pipeline run created: {run.run_id}")

Step 3.2: Compile and Run the Pipeline

As clarified previously, you compile and run the pipeline separately. Save the above Python code as recommender_pipeline.py.

Option 1: Compile from the command line:


pip install kfp
python -c "from recommender_pipeline import recommender_pipeline; from kfp import compiler; compiler.Compiler().compile(recommender_pipeline, 'recommender_pipeline.yaml')"

Option 2: Run directly from the Kubeflow UI:

  1. Navigate to the Kubeflow Pipelines dashboard in your browser.
  2. Click on "Upload Pipeline" and upload the recommender_pipeline.py file (it will compile on the fly).
  3. Click on "Create Run", select your pipeline, and give it a name.
  4. Once the run completes, inspect the outputs of the train-model step. You'll see a link or path to the stored model artifacts (e.g., minio://mlpipeline/artifacts/...). Copy this URI; you'll need it for KServe.
Important: Make sure your your_interaction_data.csv is accessible within the container for the prepare_data step. For a real pipeline, you'd likely fetch data from a persistent storage. For this local example, you might need to build the prepare_data component's image with the CSV embedded, or ensure it's mounted. For simplicity, the train_model component's Dockerfile now also copies the data, but for a real pipeline, the data would flow from one step's output to the next's input.

4. Deploying with KServe

Now that our model is trained and its artifacts are stored, we can use KServe to deploy it as a scalable inference endpoint.

Step 4.1: Create Your KServe Predictor

KServe allows you to use pre-built servers for common frameworks or provide a custom predictor. Since we used the implicit library, we'll create a custom predictor. Create a file named predictor.py:


# predictor.py
import kserve
import pickle
import pandas as pd
from scipy.sparse import csr_matrix
import os

class RecommenderModel(kserve.Model):
    """
    KServe custom model class for our implicit recommender.
    """
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.ready = False
        self.model = None
        self.user_to_id = None
        self.item_to_id = None
        self.id_to_user = None
        self.id_to_item = None

    def load(self):
        """
        Loads the trained model and mappings from the MODEL_URI.
        KServe automatically downloads the contents of MODEL_URI to a local path.
        """
        # KServe sets the MODEL_URI environment variable to the local path of the downloaded model artifacts
        model_path = os.environ.get("MODEL_URI")
        if not model_path:
            raise ValueError("MODEL_URI environment variable not set.")

        print(f"Loading model artifacts from: {model_path}")
        with open(os.path.join(model_path, 'recommender_model.pkl'), 'rb') as f:
            self.model = pickle.load(f)
        with open(os.path.join(model_path, 'user_to_id.pkl'), 'rb') as f:
            self.user_to_id = pickle.load(f)
        with open(os.path.join(model_path, 'item_to_id.pkl'), 'rb') as f:
            self.item_to_id = pickle.load(f)

        # Create inverse mappings for recommendations
        self.id_to_user = {v: k for k, v in self.user_to_id.items()}
        self.id_to_item = {v: k for k, v in self.item_to_id.items()}

        self.ready = True
        print("Model loaded successfully!")

    def predict(self, payload: dict, headers: dict = None) -> dict:
        """
        Handles inference requests. Expects a 'user_id' in the payload.
        """
        # KServe expects a 'instances' key in the payload for prediction requests.
        if "instances" not in payload:
            raise ValueError("Expected 'instances' in payload.")
        if not isinstance(payload["instances"], list) or not payload["instances"]:
            raise ValueError("Expected 'instances' to be a non-empty list.")

        # Assuming a single user_id for simplicity for now
        request_data = payload["instances"][0]
        if "user_id" not in request_data:
            raise ValueError("Expected 'user_id' in instance data.")

        user_id = request_data["user_id"]

        print(f"Received prediction request for user_id: {user_id}")

        # Convert external user_id to internal model ID
        if user_id not in self.user_to_id:
            print(f"User ID {user_id} not found in mappings.")
            return {"predictions": []} # User not found or no interactions

        internal_user_id = self.user_to_id[user_id]

        # Get recommendations from the implicit ALS model
        # `recommend` expects a user_id, user_items (known items for this user), and N (number of recs)
        # For simplicity, we assume no known items for the user during prediction
        # (meaning the model can recommend anything).
        # In a real system, you'd provide the items the user has already interacted with.
        recommendations = self.model.recommend(
            internal_user_id,
            csr_matrix(self.model.user_factors.shape), # Placeholder for user_items
            N=request_data.get("num_recommendations", 10) # Get top N recommendations
        )

        recommended_items = []
        for item_id_internal, score in recommendations:
            if item_id_internal in self.id_to_item:
                recommended_items.append({
                    "item_id": self.id_to_item[item_id_internal],
                    "score": float(score)
                })

        return {"predictions": recommended_items}

if __name__ == "__main__":
    # Start the KServe model server
    # The name here ('recommender-model') must match the model_name in the InferenceService.
    # MODEL_URI is expected to be set by KServe.
    model = RecommenderModel("recommender-model")
    kserve.ModelServer().start([model])
```

Step 4.2: Containerize Your KServe Predictor

Create a Dockerfile for your KServe serving image:


# Dockerfile for serving with KServe
FROM python:3.9-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY predictor.py .
# The CMD here tells KServe how to run your custom model.
# MODEL_NAME and MODEL_URI are environment variables that KServe will set.
CMD ["python", "-m", "kserve", "--model_name", "recommender-model", "--model_uri", "/mnt/models"]

And your requirements.txt for serving:


kserve
pandas
scipy
implicit

Build and push this Docker image:


docker build -t your-docker-registry/recommender-server:latest .
docker push your-docker-registry/recommender-server:latest

Step 4.3: Define KServe InferenceService YAML

Create a Kubernetes YAML file named recommender-service.yaml. This defines your KServe deployment.

Important: Replace your-docker-registry/recommender-server:latest with your pushed image. Also, **replace the value for MODEL_URI** with the actual URI where your trained model artifacts are stored (obtained from your Kubeflow Pipeline run, e.g., s3://your-s3-or-minio-bucket/models/recommender/).

If you're using MinIO (common with Kubeflow), the URI might look like: s3://mlpipeline/artifacts/recommender-pipeline/<run-id>/train-model/model_artifacts/. You can find this in the Kubeflow Pipelines UI under the "Artifacts" tab of your "train-model" step after a successful run.

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: recommender-service
  namespace: kubeflow-user-example-com # Or your Kubeflow user namespace
spec:
  predictor:
    # serviceAccountName: default-editor # Use this if your service account needs specific permissions
    container:
      image: your-docker-registry/recommender-server:latest # Your KServe serving image
      env:
        - name: MODEL_URI
          value: "s3://mlpipeline/artifacts/recommender-pipeline/<your-run-id>/train-model/model_artifacts/" # <-- UPDATE THIS WITH YOUR MODEL URI
        - name: STORAGE_URI # KServe might use this for MinIO config
          value: "s3://mlpipeline/artifacts/recommender-pipeline/<your-run-id>/train-model/model_artifacts/" # <-- UPDATE THIS
        - name: S3_ENDPOINT_URL # For MinIO, specify the internal MinIO service endpoint
          value: "[http://minio-service.kubeflow:9000](http://minio-service.kubeflow:9000)" # Common MinIO endpoint in Kubeflow
        - name: S3_ACCESS_KEY_ID # Your MinIO access key (or env var if configured)
          valueFrom:
            secretKeyRef:
              name: mlpipeline-minio-artifact # Default MinIO secret name
              key: accesskey
        - name: S3_SECRET_ACCESS_KEY # Your MinIO secret key
          valueFrom:
            secretKeyRef:
              name: mlpipeline-minio-artifact # Default MinIO secret name
              key: secretkey
      ports:
        - containerPort: 8080 # Default KServe port
          name: h2c # KServe uses HTTP/2
      resources:
        requests:
          cpu: 200m
          memory: 512Mi
        limits:
          cpu: 1
          memory: 1Gi

Step 4.4: Deploy the Inference Service

Apply the YAML file to your Kubernetes cluster:


kubectl apply -f recommender-service.yaml

Monitor the status of your InferenceService. It might take a few minutes for the pods to spin up and become ready.


kubectl get inferenceservice recommender-service -n kubeflow-user-example-com

Look for the URL in the output. This is the endpoint where your model will serve predictions.


NAME                  URL                                      READY   PREDICTOR_READY   EXPLAINER_READY   RAW_READY   AGE
recommender-service   [http://recommender-service.kubeflow](http://recommender-service.kubeflow)...   True    True              False             False       2m

5. Sending Inference Requests

Once your KServe InferenceService is ready, you can send prediction requests to it.

Step 5.1: Get the Ingress Gateway Address

KServe typically routes traffic through Istio's ingress gateway. You need to get its IP address or hostname:


# Get the external IP of the Istio Ingress Gateway
# This might be an IP or a hostname depending on your cloud provider/setup
export INGRESS_HOST=\(kubectl \-n istio\-system get service istio\-ingressgateway \-o jsonpath\='\{\.status\.loadBalancer\.ingress\[0\]\.ip\}'\)
export INGRESS\_PORT\=(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}')

# Get the hostname for your KServe service
export SERVICE_HOSTNAME=$(kubectl get inferenceservice recommender-service -n kubeflow-user-example-com -o jsonpath='{.status.url}' | cut -d "/" -f 3)

echo "Ingress Host: ${INGRESS_HOST}"
echo "Ingress Port: ${INGRESS_PORT}"
echo "Service Hostname: ${SERVICE_HOSTNAME}"

Step 5.2: Make a Prediction Request

Use curl to send a JSON request to your model endpoint. We'll ask for recommendations for user_id: 1.


curl -v \
  -H "Host: \{SERVICE\_HOSTNAME\}" \\
\-H "Content\-Type\: application/json" \\
"http\://{INGRESS_HOST}:${INGRESS_PORT}/v1/models/recommender-service:predict" \
  -d '{ "instances": [ { "user_id": 1 } ] }'

You should receive a JSON response similar to this, containing recommendations for the specified user ID:


{
  "predictions": [
    {
      "item_id": 103,
      "score": 0.56789
    },
    {
      "item_id": 104,
      "score": 0.45678
    },
    ...
  ]
}

Troubleshooting Tips

  • KServe not Ready: Check kubectl describe inferenceservice recommender-service -n kubeflow-user-example-com for events and conditions.
  • Pod Errors: Check pod logs: kubectl logs <your-kserve-pod-name> -n kubeflow-user-example-com.
  • Incorrect MODEL_URI: Ensure your MODEL_URI in the InferenceService YAML points to the exact location of your model artifacts (the directory containing recommender_model.pkl, user_to_id.pkl, item_to_id.pkl).
  • MinIO Permissions: Double-check that your KServe deployment has the necessary permissions to read from your MinIO bucket. The provided YAML uses the default `mlpipeline-minio-artifact` secret, which usually has the correct access.
  • Networking Issues: If you cannot reach the Istio Ingress Gateway, check your Kubernetes cluster's network configuration and firewall rules.
  • Payload Format: Ensure your inference request payload matches the format expected by your predictor.py (i.e., {"instances": [{"user_id": X}]}).

Conclusion

Congratulations! You've successfully built a simple recommender system, containerized its training and serving components, optionally orchestrated it with Kubeflow Pipelines, and deployed it on Kubeflow using KServe. This setup provides a powerful and scalable foundation for your ML inference workloads.

From here, you can explore more advanced topics:

  • Implementing more sophisticated recommender algorithms (e.g., deep learning models).
  • Integrating with external data sources for real-time recommendations.
  • Setting up continuous integration/continuous deployment (CI/CD) for your ML models.
  • Exploring KServe's advanced features like canary rollouts, A/B testing, and explainability.
  • Monitoring your model's performance and drift.

Happy recommending!

Learn More about Kubeflow Explore KServe Documentation

Comments

Popular posts from this blog

Risk Management for Data Scientists in Insurance and Finance

CrewAI vs LangGraph: A Simple Guide to Multi-Agent Frameworks