In this tutorial, we walk through an end-to-end implementation of an advanced machine learning pipeline using ZenML. We begin by setting up the environment and initializing a ZenML project, then define a custom materializer that enables seamless serialization and metadata extraction for a domain-specific dataset object. As we progress, we build a modular pipeline that performs data loading, preprocessing, and a fan-out hyperparameter search across multiple models. We evaluate each candidate, log rich metadata at every step, and use a fan-in strategy to select and promote the best-performing model. Throughout the process, we leverage ZenML’s model control plane, artifact tracking, and caching mechanisms to ensure full reproducibility, transparency, and efficiency.
Copy CodeCopiedUse a different Browserimport os, sys, subprocess, json, shutil
from pathlib import Path
def _sh(cmd, check=True):
print(f”$ {‘ ‘.join(cmd)}”)
return subprocess.run(cmd, check=check)
_sh([sys.executable, “-m”, “pip”, “install”, “-q”,
“zenml[server]”, “scikit-learn”, “pandas”, “pyarrow”])
PROJECT = Path(“/content/zenml_advanced_tutorial”) if Path(“/content”).exists()
else Path.cwd() / “zenml_advanced_tutorial”
if PROJECT.exists():
shutil.rmtree(PROJECT)
PROJECT.mkdir(parents=True)
os.chdir(PROJECT)
os.environ[“ZENML_ANALYTICS_OPT_IN”] = “false”
os.environ[“ZENML_LOGGING_VERBOSITY”] = “WARN”
_sh([“zenml”, “init”], check=False)
We set up the entire environment by installing required libraries and initializing a ZenML project workspace. We create a clean working directory and configure environment variables to control logging and analytics behavior. Finally, we bootstrap the ZenML repository so that all subsequent pipeline operations are properly tracked and managed.
Copy CodeCopiedUse a different Browserfrom typing import Annotated, Tuple, Dict, List, Any
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
from zenml import pipeline, step, log_metadata, Model, get_step_context
from zenml.client import Client
from zenml.materializers.base_materializer import BaseMaterializer
from zenml.enums import ArtifactType
from zenml.io import fileio
class DatasetBundle:
def __init__(self, X, y, feature_names, stats=None):
self.X = np.asarray(X)
self.y = np.asarray(y)
self.feature_names = list(feature_names)
self.stats = stats or {}
class DatasetBundleMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (DatasetBundle,)
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
def load(self, data_type):
with fileio.open(os.path.join(self.uri, “X.npy”), “rb”) as f:
X = np.load(f)
with fileio.open(os.path.join(self.uri, “y.npy”), “rb”) as f:
y = np.load(f)
with fileio.open(os.path.join(self.uri, “meta.json”), “r”) as f:
meta = json.loads(f.read())
return DatasetBundle(X, y, meta[“feature_names”], meta[“stats”])
def save(self, bundle):
with fileio.open(os.path.join(self.uri, “X.npy”), “wb”) as f:
np.save(f, bundle.X)
with fileio.open(os.path.join(self.uri, “y.npy”), “wb”) as f:
np.save(f, bundle.y)
with fileio.open(os.path.join(self.uri, “meta.json”), “w”) as f:
f.write(json.dumps({
“feature_names”: bundle.feature_names,
“stats”: bundle.stats,
}))
def extract_metadata(self, bundle):
classes, counts = np.unique(bundle.y, return_counts=True)
return {
“n_samples”: int(bundle.X.shape[0]),
“n_features”: int(bundle.X.shape[1]),
“class_distribution”: {str(c): int(n) for c, n in zip(classes, counts)},
}
We import all necessary libraries and define a custom data container along with its materializer. We implement logic to save, load, and extract metadata from our dataset, enabling seamless artifact handling in ZenML. This ensures that our data is not only stored efficiently but also enriched with meaningful, queryable metadata.
Copy CodeCopiedUse a different Browser@step(enable_cache=True)
def load_data() -> Annotated[DatasetBundle, “raw_dataset”]:
data = load_breast_cancer()
return DatasetBundle(
data.data, data.target, data.feature_names,
stats={“source”: “sklearn.datasets.load_breast_cancer”},
)
@step
def split_and_scale(
bundle: DatasetBundle,
test_size: float = 0.2,
random_state: int = 42,
) -> Tuple[
Annotated[np.ndarray, “X_train”],
Annotated[np.ndarray, “X_test”],
Annotated[np.ndarray, “y_train”],
Annotated[np.ndarray, “y_test”],
]:
X_tr, X_te, y_tr, y_te = train_test_split(
bundle.X, bundle.y, test_size=test_size,
random_state=random_state, stratify=bundle.y,
)
scaler = StandardScaler().fit(X_tr)
X_tr, X_te = scaler.transform(X_tr), scaler.transform(X_te)
log_metadata(metadata={“train_size”: len(X_tr), “test_size”: len(X_te)})
return X_tr, X_te, y_tr, y_te
@step
def train_candidate(
X_train: np.ndarray,
y_train: np.ndarray,
model_type: str = “random_forest”,
n_estimators: int = 100,
max_depth: int = 5,
) -> Annotated[Any, “candidate_model”]:
if model_type == “random_forest”:
m = RandomForestClassifier(n_estimators=n_estimators,
max_depth=max_depth, random_state=42)
elif model_type == “gradient_boosting”:
m = GradientBoostingClassifier(n_estimators=n_estimators,
max_depth=max_depth, random_state=42)
else:
m = LogisticRegression(max_iter=2000, random_state=42)
m.fit(X_train, y_train)
log_metadata(metadata={
“model_type”: model_type,
“hyperparameters”: {“n_estimators”: n_estimators, “max_depth”: max_depth},
})
return m
We define core pipeline steps for loading data, splitting it, scaling features, and training model candidates. We ensure that data loading is cached for efficiency while logging key metadata during preprocessing and training. This forms the backbone of our pipeline, where each model is trained independently with its respective configuration.
Copy CodeCopiedUse a different Browser@step
def evaluate_candidate(
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
label: str,
) -> Annotated[Dict[str, Any], “metrics”]:
preds = model.predict(X_test)
probs = (model.predict_proba(X_test)[:, 1]
if hasattr(model, “predict_proba”) else preds)
metrics: Dict[str, Any] = {
“accuracy”: float(accuracy_score(y_test, preds)),
“f1”: float(f1_score(y_test, preds)),
“roc_auc”: float(roc_auc_score(y_test, probs)),
“label”: label,
}
log_metadata(metadata=metrics)
return metrics
@step
def select_best(
metrics_list: List[Dict[str, Any]],
models: List[Any],
) -> Annotated[Any, “production_model”]:
best_idx = max(range(len(metrics_list)),
key=lambda i: metrics_list[i][“roc_auc”])
best = metrics_list[best_idx]
ctx = get_step_context()
try:
ctx.model.log_metadata({“chosen_candidate”: best,
“candidate_index”: best_idx})
except Exception as e:
print(f” (model metadata log skipped: {e})”)
log_metadata(metadata={
“winning_metrics”: {k: v for k, v in best.items() if k != “label”},
})
print(f”n Best candidate: {best[‘label’]} → ”
f”ROC AUC = {best[‘roc_auc’]:.4f}n”)
return models[best_idx]
We evaluate each trained model using multiple performance metrics and log the results. We then implement a selection mechanism that identifies the best-performing model based on ROC AUC. Additionally, we attach relevant metadata to the model version, enabling traceability and informed decision-making.
Copy CodeCopiedUse a different BrowserSEARCH_SPACE = [
{“model_type”: “random_forest”, “n_estimators”: 50, “max_depth”: 3},
{“model_type”: “random_forest”, “n_estimators”: 200, “max_depth”: 7},
{“model_type”: “gradient_boosting”, “n_estimators”: 100, “max_depth”: 3},
{“model_type”: “logistic”, “n_estimators”: 1, “max_depth”: 1},
]
PRODUCTION_MODEL = Model(
name=”breast_cancer_classifier”,
description=”Best model from in-pipeline hyperparameter search”,
tags=[“tutorial”, “advanced”],
)
@pipeline(model=PRODUCTION_MODEL, enable_cache=True)
def training_pipeline(test_size: float = 0.2):
bundle = load_data()
models, metrics = [], []
for i, cfg in enumerate(SEARCH_SPACE):
m = train_candidate(
X_train, y_train, **cfg,
id=f”train_{i}_{cfg[‘model_type’]}”,
)
s = evaluate_candidate(
m, X_test, y_test,
label=f”{cfg[‘model_type’]}(n={cfg[‘n_estimators’]},d={cfg[‘max_depth’]})”,
id=f”eval_{i}”,
)
models.append(m)
metrics.append(s)
select_best(metrics, models)
print(“n” + “=” * 70 + “n RUNNING TRAINING PIPELINEn” + “=” * 70)
run_obj = training_pipeline()
print(“n” + “=” * 70 + “n INSPECTING THE RUNn” + “=” * 70)
client = Client()
run = client.get_pipeline_run(run_obj.id)
print(f”nPipeline: {run.pipeline.name}”)
print(f”Run name: {run.name}”)
print(f”Status: {run.status}”)
print(f”Step runs: {len(run.steps)}”)
for name, step_run in run.steps.items():
print(f” • {name:35s} status={step_run.status}”)
print(“nRun-level metadata (aggregated from steps):”)
for k, v in (run.run_metadata or {}).items():
short = str(v)
print(f” {k}: {short[:80]}{‘…’ if len(short) > 80 else ”}”)
print(“n” + “-” * 70 + “n MODEL CONTROL PLANEn” + “-” * 70)
try:
mv = client.get_model_version(PRODUCTION_MODEL.name, “latest”)
except Exception:
mv = client.list_model_versions(model_name_or_id=PRODUCTION_MODEL.name)[0]
print(f”Model: {mv.model.name}”)
print(f”Version: {mv.name} (number={mv.number})”)
linked = list(mv.data_artifact_ids.keys()) if hasattr(mv, “data_artifact_ids”) else []
print(f”Linked outputs: {linked or ‘(see dashboard)’}”)
if mv.run_metadata:
print(“Version metadata:”)
for k, v in dict(mv.run_metadata).items():
print(f” {k}: {str(v)[:80]}”)
print(“n” + “-” * 70 + “n RELOADING ARTIFACTS DIRECTLYn” + “-” * 70)
prod_artifact = client.get_artifact_version(“production_model”)
prod_model = prod_artifact.load()
print(f”Loaded model class: {type(prod_model).__name__}”)
print(f”Artifact metadata: {dict(prod_artifact.run_metadata) if prod_artifact.run_metadata else ‘{}’}”[:120])
X_test_arr = client.get_artifact_version(“X_test”).load()
y_test_arr = client.get_artifact_version(“y_test”).load()
acc = accuracy_score(y_test_arr, prod_model.predict(X_test_arr))
print(f”Sanity-check accuracy on stored X_test: {acc:.4f}”)
ds_artifact = client.get_artifact_version(“raw_dataset”)
print(f”nraw_dataset auto-extracted metadata:”)
for k, v in (ds_artifact.run_metadata or {}).items():
print(f” {k}: {v}”)
print(“n” + “=” * 70 + “n RE-RUNNING — STEPS SHOULD BE CACHEDn” + “=” * 70)
training_pipeline()
print(“””
Tutorial complete.
What just happened:
• Custom materializer serialized a domain object + auto-extracted metadata.
• Fan-out: 4 candidates trained + evaluated as 8 distinct step runs.
• Fan-in: select_best joined them and promoted the winner.
• Model Control Plane created a versioned ‘breast_cancer_classifier’.
• Every artifact, metric, and hyperparameter was logged and queryable.
• Second run hit the cache — zero recomputation.
Explore further from this same Python session:
Client().list_pipeline_runs()
Client().list_model_versions(model_name_or_id=”breast_cancer_classifier”)
Client().list_artifact_versions(name=”metrics”)
“””)
We define the full pipeline, execute it, and inspect the results using the ZenML Client API. We perform a fan-out over multiple configurations, followed by a fan-in step to select the best model. Finally, we demonstrate artifact reuse, metadata inspection, and caching behavior by re-running the pipeline without redundant computation.
In conclusion, we constructed a robust, production-style ML pipeline that demonstrates the full power of ZenML’s orchestration capabilities. We observed how custom materializers enrich artifacts with meaningful metadata, how multiple model candidates can be trained and evaluated in parallel, and how the best model is automatically selected and versioned. We also explored how to inspect pipeline runs, retrieve artifacts directly without recomputation, and verify model performance using stored data. Also, we saw caching in action during a re-run, confirming that redundant computations are avoided. This workflow provides a strong foundation for building scalable, maintainable, and reproducible machine learning systems in real-world scenarios.
Check out the Full Codes with Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post How to Build an End-to-End Production Grade Machine Learning Pipeline with ZenML, Including Custom Materializers, Metadata Tracking, and Hyperparameter Optimization appeared first on MarkTechPost.
