This setup mostly fits for the development environment, in the future articles we will discuss production deployment.

Experiments in Jupyter

At first we start with Jupyter, but Jupyter does not scale well if we are on to model deployment.

Enabling Jupyter

I do my work in VSCode and it integrates well with UV.
We can enable Jupyter the following way: uv add --dev ipykernel
For more info: Using uv with Jupyter | Using Jupyter from VS Code | uv

Link to original

Here is Jupyter note showcasing linear regression model.

In [196]:
!python -V
Out[196]:
Python 3.12.1

Install packages.

With uv + vscode, there are two options (I went with first).

  1. Add them to the project, cli: uv add pandas or jupyter: !uv add pandas
  2. Install them, bypassing pyproject.toml. Jupyter: !uv pip install pandas

More in README.MD

In [197]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pickle
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import root_mean_squared_error  # , mean_squared_error be
In [ ]:
df = pd.read_parquet("../data/green_tripdata_2021-01.parquet")
In [199]:
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
In [200]:
# get travel duration time, delta
df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
# for each element in duration (td) apply {math}
df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
In [201]:
# https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf
# df = df[df.trip_type == 2] # unrequred
In [202]:
# Slght deviation from the lecture, we should do filtering before plotting in this case
df = df[((df.duration >= 1) & (df.duration <= 60))]
In [203]:
# dictplot is deprecated, general alternative is displot.
 
# Kernel density estimation (KDE) is enabled (smoothing of the graph)
# Density: how likely values to occur within a certain range (regions of data).
# Density is normalized, so you can compare distribution, unlike frequencies or counts.
 
# .set part is unnecessary, sometimes helps with readability
sns.displot(df.duration, kde=True, stat="density")  # .set(xlim=(0, 100),ylim=(0, 0.06))
Out[203]:
<seaborn.axisgrid.FacetGrid at 0x7e07d9cf8d70>
<Figure size 500x500 with 1 Axes>
In [204]:
# Checking percentile to see below which values most of our rides are
df.duration.describe(percentiles=[0.95, 0.98, 0.99])
Out[204]:
count    73908.000000
mean        16.852578
std         11.563163
min          1.000000
50%         14.000000
95%         41.000000
98%         48.781000
99%         53.000000
max         60.000000
Name: duration, dtype: float64
In [ ]:
categorical = ["PULocationID", "DOLocationID"]
numerical = ["trip_distance"]
In [206]:
df[categorical] = df[categorical].astype(str)
In [ ]:
train_dict = df[categorical + numerical].to_dict(orient="records")

Notes:

One-hot encoding

Is a technique to convert categorical variables (like strings or IDs) into a format that can be provided to machine learning algorithms.

How it works:

  • For each unique value in a categorical column, a new column is created.
  • In each row, the column corresponding to the value is set to 1, and all others are set to 0.

Example

Before:

Color
Red
Blue
Green

After one-hot:

Color=RedColor=GreenColor=Blue
100
001
010

So, categorical data is represented numerically, ergo usable for most machine learning models.

Matrix

In this case each value of DOLocationID, PULocationID becomes a “column” which shows 0 or 1, each row is a ride. trip_distance remains unchanged,

After DictVectorizer, matrix will look like this:

PULocationID=10PULocationID=15DOLocationID=20DOLocationID=30trip_distance
10102.5
01011.2
10013.8

Final matrix has as many columns as there are unique categorical values (from both columns) plus one column for each numerical feature.

In [208]:
dv = DictVectorizer()
X_train = dv.fit_transform(train_dict)
In [209]:
# converting col into numpy array.
# making it a target for learning.
target = "duration"
y_train = df[target].values
In [210]:
lr = LinearRegression()
lr.fit(X_train, y_train)  # makes it learn
y_pred = lr.predict(
    X_train
)  # we can predict on "any" data we give, y is not asked, because it asumes it from .fit
In [211]:
# dictplot is deprecated, for overlapping plots I found there are
# two alternatives: histplot and kdeplot. Kdeplot looks more informative
 
sns.histplot(
    y_pred, kde=True, stat="density", label="prediction", color="C0", alpha=0.5
)
sns.histplot(y_train, kde=True, stat="density", label="actual", color="C1", alpha=0.5)
plt.legend()  # render legend labels
plt.show()
Out[211]:
<Figure size 640x480 with 1 Axes>
In [212]:
# About using root_mean..., it's same as using mean_squared_error(squared=false)
 
# calc the wellness of prediction via comparison of train vs pred via formula (y_true - y_pred) ** 2
root_mean_squared_error(y_train, y_pred)
# Thought model is bad, prediction is off by 9 minutes on average
Out[212]:
9.838799799829626
In [213]:
# refactor for function approach
def read_dataframe(filename):
    if filename.endswith(".csv"):
        df = pd.read_csv(filename)
 
        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    elif filename.endswith(".parquet"):
        df = pd.read_parquet(filename)
 
    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
 
    df = df[(df.duration >= 1) & (df.duration <= 60)]
 
    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)
 
    return df
In [ ]:
df_train = read_dataframe("../data/green_tripdata_2021-01.parquet")
df_val = read_dataframe("../data/green_tripdata_2021-02.parquet")
In [215]:
# This way model treat combined PU/DO as unique identifier
# I guess it helps the model to learn on specific patterns of PU/DO combination
df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
df_val["PU_DO"] = df_val["PULocationID"] + "_" + df_val["DOLocationID"]

training - january
validation - february

In [216]:
categorical = ["PU_DO"]  # combined 'PULocationID', 'DOLocationID'
numerical = ["trip_distance"]
 
dv = DictVectorizer()
 
train_dicts = df_train[categorical + numerical].to_dict(orient="records")
X_train = dv.fit_transform(train_dicts)
 
val_dicts = df_val[categorical + numerical].to_dict(orient="records")
X_val = dv.transform(val_dicts)
In [217]:
target = "duration"
y_train = df_train[target].values
y_val = df_val[target].values
In [218]:
lr = LinearRegression()
lr.fit(X_train, y_train)
 
y_pred = lr.predict(X_val)
 
root_mean_squared_error(y_val, y_pred)
Out[218]:
7.758715209092169
In [219]:
# exporting the model
with open("../models/lin_reg.bin", "wb") as f_out:
    pickle.dump((dv, lr), f_out)
In [220]:
lr = Lasso(alpha=0.0001)  # Alpha uses the math concept of regularization
lr.fit(X_train, y_train)
 
y_pred = lr.predict(X_val)
 
root_mean_squared_error(y_val, y_pred)
Out[220]:
7.616617770546549

A Brief Into MLFlow

Use MLFlow for experiment: tracking, logging and statistical insights and it is supported by within Databricks. You can install it in a python project via uv add mlflow

To start MLFlow tracking server: mlflow ui --backend-store-uri sqlite:///mlflow.db — MLFlow supports remote artifact stores, such as AWS S3 Artifact Stores | MLflow


Within your experiment, MLFlow provides a way to log parameters, models, and artifacts with mlflow.log* methods (like mlflow.log_params(params_dict)) and to automatically log variety of the data utilize MLFlow autolog. But keep in mind that autolog won’t register a model for you.

You can load a saved model with:

import mlflow.pyfunc
model = mlflow.pyfunc.load_model(model_uri="models:/<model_name>/<version_or_alias>")
result = model.predict(data)

more options here: Load a Registered Model | MLflow

You can control MLFlow within python with:

from mlflow.tracking import MlflowClient
client = MlflowClient()

client.search_runs does not return runs[0].outputs even though in Jupyter I can see for some reason, the following on the bottom may not work:

experiment_id = client.get_experiment_by_name("random-forest-hyperopt").experiment_id
 
runs = client.search_runs(experiment_ids=experiment_id, order_by=["metrics.rmse ASC"])
 
model_id = runs[0].outputs.model_outputs

Instead get a run directly:

experiment_id = client.get_experiment_by_name("random-forest-hyperopt").experiment_id
 
runs = client.search_runs(experiment_ids=experiment_id, order_by=["metrics.rmse ASC"])
 
run_id = runs[0].info.run_id
 
model_id = client.get_run(run_id).outputs.model_outputs[0].model_id

Permanently delete an experiment:

mlflow gc --backend-store-uri sqlite:///mlflow.db --experiment-ids 4 --tracking-uri http://127.0.0.1:5000

Kubernetes Deployment

Kubernetes comes in handy in implementation of MLOps.
Firstly we would need an image, let’s create a dockerfile. This way we can have more granular control of what goes into it, instead of relying on developer images.

FROM python:3.12-slim-bookworm
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
 
RUN apt-get -y update && \
    apt-get -y install python3-dev build-essential curl pkg-config
 
RUN uv init
 
RUN uv add mlflow boto3 psycopg2-binary
 
CMD ["bash"]

Build it: docker build --tag mlflow:0.0.1 . -f 03-orchestration/mlflow.dockerfile

Attention

If your K8S is setup via kind you can load extended image into K8S as such:

kind load docker-image name:version
Link to original

We will assume we are deploying it along side pre-existing PostgreSQL Database within the same namespace:

mlflow-deployment.yaml
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: mlflow-artifacts-pvc
  namespace: mlflow
spec:
  storageClassName: standard
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-deployment
  namespace: mlflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow
  template:
    metadata:
      labels:
        app: mlflow
    spec:
      containers:
      - name: mlflow
        image: mlflow:0.0.1
        ports:
        - containerPort: 5000
        env:
          #- name: MLFLOW_S3_IGNORE_TLS
           # value: "true"
          - name: POSTGRES_USER
            valueFrom:
              secretKeyRef:
                name: postgres-secret
                key: postgres-user
          - name: POSTGRES_PASSWORD
            valueFrom:
              secretKeyRef:
                name: postgres-secret
                key: postgres-password
          - name: POSTGRES_DB
            valueFrom:
              secretKeyRef:
                name: postgres-secret
                key: postgres-db
        command: ["uv", "run", "mlflow", "server", "--host", "0.0.0.0", "--port", "5000", "--backend-store-uri", "postgresql://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres-service:5432/$(POSTGRES_DB)"]
        volumeMounts:
          - name: mlflow-artifacts
            mountPath: /mlflow/artifacts
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
      volumes:
        - name: mlflow-artifacts
          persistentVolumeClaim:
            claimName: mlflow-artifacts-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: mlflow
  namespace: mlflow
spec:
  selector:
    app: mlflow
  ports:
  - port: 5000
    targetPort: 5000
  type: ClusterIP

Orchestration via Apache Airflow

Orchestrator not Executor

It is recommended to avoid learning, transforming and generally working with large data in Apache Airflow and utilize Apache Spark or alternatives instead.

If we work with large datasets they may overload XCom and crash the DAG. For me 1GB in-memory was large enough to consider alternatives.

Deploy via Helm

Airflow allows us to extend image to add more python packages or additional configurations. Let’s do both.

Suppose we need mlflow and couple of other libraries we have in our project:

pyproject.toml
# ...
airflow-server = [
    "mlflow>=3.1.1",
    "xgboost>=3.0.2",
    "pandas>=2.3.0",
    "hyperopt>=0.2.7",
    "scikit-learn>=1.7.0",
]
# ...

Let’s extend image with uv pip.

airflow.dockerfile
FROM apache/airflow:3.0.2
 
# Another way is to define image as argument variable
# ARG AIRFLOW_IMAGE_NAME 
# FROM ${AIRFLOW_IMAGE_NAME}
# and then pass the arg docker build ... --build-arg AIRFLOW_IMAGE_NAME=$AIRFLOW_IMAGE_NAME 
 
COPY pyproject.toml ./
RUN uv pip install --no-cache --group airflow-server

Now we can build the image from dockerfile.

docker build --pull --tag my-airflow:0.0.1 . -f airflow.dockerfile

Attention

If your K8S is setup via kind you can load extended image into K8S as such:

kind load docker-image name:version

To provide additional configuration, since we deploy though Helm in K8S we would extend the chart. We can let’s learn more from the docs:

airflow-values.yaml
dags:
  gitSync:
    enabled: true
    repo: repo_url.git
    branch: main
    subPath: folder/sub_folder
    period: 5s
    #sshKeySecret:

Configuration is ready for deployment, this will create airflow deployment and the boilerplate:

helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace \
    --set images.airflow.repository=my-airflow \
    --set images.airflow.tag=0.0.1 \
    -f aifrflow-values.yaml

Link to original

Communication: Airflow to MLFlow

We can we provide tracking server URL like we did normally and fully communicate with MLFlow.

taxi_predicton
@task
def train_model(x_train, y_train, x_val, y_val, dv):
	log = logging.getLogger("airflow.task")
	log.info("Starting model training.")
	mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
	mlflow.set_experiment("nyc-taxi-experiment")
	#...
	mlflow.sklearn.log_model(
		sk_model=lr,
		artifact_path="model",
		registered_model_name="nyc-taxi-yellow-prediction",
	)

DAG workflow overview
Task logs example

There is a concern: taking into account isolated nature of tasks, we are relying on passing data with XCom. For higher number of experiment or data, consider using Apache Spark or Databricks operators.

At Apache Airflow article you can find more info on deployment and configuration.