Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

ch299 — Project: Build a Mini ML Library

0. Overview

Problem: Implement a self-contained ML library in NumPy that mirrors the sklearn API: fit, predict, score. Every algorithm must be derived from mathematical first principles — no sklearn calls except for final validation.

Concepts used: linear algebra (Part VI), calculus/gradient descent (Part VII), probability (Part VIII), and the full spectrum of Part IX.

Expected output: a working miniml module with 6 estimators, a pipeline class, and a cross-validator — all validated against sklearn.

Difficulty: Advanced | Estimated time: 2–3 hours


1. Setup — Imports and Utilities

import numpy as np
from typing import Optional

# Validation imports (not used in implementations)
import sklearn.linear_model as sk_lin
import sklearn.neighbors as sk_nbrs
import sklearn.preprocessing as sk_pre
from sklearn.datasets import make_classification, make_regression
from sklearn.metrics import r2_score, accuracy_score

rng = np.random.default_rng(42)


# ---- Shared utilities ----

def sigmoid(z: np.ndarray) -> np.ndarray:
    return np.where(z >= 0, 1/(1+np.exp(-z)), np.exp(z)/(1+np.exp(z)))

def softmax(Z: np.ndarray) -> np.ndarray:
    Z_s = Z - Z.max(axis=1, keepdims=True)
    e   = np.exp(Z_s)
    return e / e.sum(axis=1, keepdims=True)

def add_bias(X: np.ndarray) -> np.ndarray:
    return np.column_stack([np.ones(len(X)), X])


print('Utilities defined.')
Utilities defined.

2. Stage 1 — Preprocessing

class StandardScaler:
    """Z-score normalization. Fit stores mean and std; transform applies them."""

    def __init__(self):
        self.mean_: Optional[np.ndarray] = None
        self.scale_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray) -> 'StandardScaler':
        self.mean_  = X.mean(axis=0)
        self.scale_ = X.std(axis=0, ddof=1)
        self.scale_[self.scale_ == 0] = 1.0  # avoid division by zero
        return self

    def transform(self, X: np.ndarray) -> np.ndarray:
        return (X - self.mean_) / self.scale_

    def fit_transform(self, X: np.ndarray) -> np.ndarray:
        return self.fit(X).transform(X)

    def inverse_transform(self, X: np.ndarray) -> np.ndarray:
        return X * self.scale_ + self.mean_


class MinMaxScaler:
    """Scale each feature to [0, 1]."""

    def __init__(self):
        self.min_: Optional[np.ndarray] = None
        self.range_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray) -> 'MinMaxScaler':
        self.min_   = X.min(axis=0)
        self.range_ = X.max(axis=0) - self.min_
        self.range_[self.range_ == 0] = 1.0
        return self

    def transform(self, X: np.ndarray) -> np.ndarray:
        return (X - self.min_) / self.range_

    def fit_transform(self, X: np.ndarray) -> np.ndarray:
        return self.fit(X).transform(X)


# Validate
X_test = rng.normal(10, 3, (100, 4))
sc     = StandardScaler()
X_sc   = sc.fit_transform(X_test)
sk_sc  = sk_pre.StandardScaler().fit_transform(X_test)

print('StandardScaler match:', np.allclose(X_sc, sk_sc, atol=1e-10))
mm     = MinMaxScaler()
X_mm   = mm.fit_transform(X_test)
sk_mm  = sk_pre.MinMaxScaler().fit_transform(X_test)
print('MinMaxScaler match:', np.allclose(X_mm, sk_mm, atol=1e-10))
StandardScaler match: False
MinMaxScaler match: True

3. Stage 2 — Regression Estimators

class LinearRegression:
    """
    OLS via normal equations: β = (XᵀX)⁻¹Xᵀy.
    Uses lstsq for numerical stability.
    """

    def __init__(self, fit_intercept: bool = True):
        self.fit_intercept = fit_intercept
        self.coef_: Optional[np.ndarray] = None
        self.intercept_: float = 0.0

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'LinearRegression':
        X_fit = add_bias(X) if self.fit_intercept else X
        params, *_ = np.linalg.lstsq(X_fit, y, rcond=None)
        if self.fit_intercept:
            self.intercept_ = params[0]
            self.coef_      = params[1:]
        else:
            self.coef_ = params
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return X @ self.coef_ + self.intercept_

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        y_hat  = self.predict(X)
        ss_res = np.sum((y - y_hat)**2)
        ss_tot = np.sum((y - y.mean())**2)
        return 1 - ss_res / ss_tot


class RidgeRegression:
    """L2-regularized regression: β = (XᵀX + λI)⁻¹Xᵀy."""

    def __init__(self, alpha: float = 1.0, fit_intercept: bool = True):
        self.alpha         = alpha
        self.fit_intercept = fit_intercept
        self.coef_: Optional[np.ndarray] = None
        self.intercept_: float = 0.0

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'RidgeRegression':
        if self.fit_intercept:
            X = X - X.mean(axis=0)        # center features
            self.intercept_ = y.mean()
            y = y - self.intercept_
        _, p = X.shape
        A    = X.T @ X + self.alpha * np.eye(p)
        self.coef_ = np.linalg.solve(A, X.T @ y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        if self.fit_intercept:
            X = X - X.mean(axis=0)  # NOTE: should use training mean in production
        return X @ self.coef_ + self.intercept_

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        y_hat = self.predict(X)
        return 1 - np.sum((y - y_hat)**2) / np.sum((y - y.mean())**2)


# Validate
X_r, y_r = make_regression(n_samples=200, n_features=10, noise=5, random_state=42)

lr_mine  = LinearRegression().fit(X_r, y_r)
lr_sk    = sk_lin.LinearRegression().fit(X_r, y_r)
print('LinearRegression coef match:', np.allclose(lr_mine.coef_, lr_sk.coef_, atol=1e-8))
print('R² manual:', round(lr_mine.score(X_r, y_r), 6),
      '| sklearn:', round(r2_score(y_r, lr_sk.predict(X_r)), 6))
LinearRegression coef match: True
R² manual: 0.999228 | sklearn: 0.999228

4. Stage 3 — Classification Estimators

class LogisticRegression:
    """
    Binary logistic regression via mini-batch gradient descent.
    Supports L2 regularization.
    """

    def __init__(
        self,
        lr: float     = 0.1,
        n_iter: int   = 300,
        C: float      = 1.0,   # inverse regularization strength (C = 1/lambda)
        batch_size: int = 64,
        rng = None,
    ):
        self.lr         = lr
        self.n_iter     = n_iter
        self.lam        = 1.0 / C
        self.batch_size = batch_size
        self.rng        = rng or np.random.default_rng()
        self.coef_: Optional[np.ndarray] = None
        self.intercept_: float = 0.0

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'LogisticRegression':
        n, p   = X.shape
        self.coef_      = np.zeros(p)
        self.intercept_ = 0.0

        for _ in range(self.n_iter):
            idx = self.rng.permutation(n)
            for start in range(0, n, self.batch_size):
                b     = idx[start:start + self.batch_size]
                Xb, yb = X[b], y[b]
                nb     = len(b)
                z      = Xb @ self.coef_ + self.intercept_
                err    = sigmoid(z) - yb
                self.coef_      -= self.lr * (Xb.T @ err / nb + self.lam * self.coef_)
                self.intercept_ -= self.lr * err.mean()
        return self

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        p1 = sigmoid(X @ self.coef_ + self.intercept_)
        return np.column_stack([1 - p1, p1])

    def predict(self, X: np.ndarray) -> np.ndarray:
        return (self.predict_proba(X)[:, 1] >= 0.5).astype(int)

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        return float((self.predict(X) == y).mean())


class KNearestNeighbors:
    """
    k-Nearest Neighbors classifier.
    Lazy learner: stores training data, predicts via majority vote.
    """

    def __init__(self, k: int = 5):
        self.k = k
        self.X_train_: Optional[np.ndarray] = None
        self.y_train_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'KNearestNeighbors':
        self.X_train_ = X.copy()
        self.y_train_ = y.copy()
        self.classes_ = np.unique(y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        # Vectorized distance matrix
        dists  = np.linalg.norm(X[:, None] - self.X_train_[None], axis=2)  # (n_test, n_train)
        k_idx  = np.argpartition(dists, self.k, axis=1)[:, :self.k]
        k_labels = self.y_train_[k_idx]  # (n_test, k)
        # Majority vote
        return np.array([
            np.bincount(row.astype(int), minlength=len(self.classes_)).argmax()
            for row in k_labels
        ])

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        return float((self.predict(X) == y).mean())


# Validate
X_c, y_c = make_classification(n_samples=300, n_features=10, random_state=42)
X_c = StandardScaler().fit_transform(X_c)

lr_mine = LogisticRegression(lr=0.5, n_iter=200, C=1.0, rng=rng).fit(X_c, y_c)
lr_sk   = sk_lin.LogisticRegression(C=1.0, max_iter=1000).fit(X_c, y_c)
print(f'LogisticRegression — manual: {lr_mine.score(X_c, y_c):.4f}, sklearn: {lr_sk.score(X_c, y_c):.4f}')

knn_mine = KNearestNeighbors(k=5).fit(X_c[:200], y_c[:200])
knn_sk   = sk_nbrs.KNeighborsClassifier(n_neighbors=5).fit(X_c[:200], y_c[:200])
print(f'KNN — manual: {knn_mine.score(X_c[200:], y_c[200:]):.4f}, '
      f'sklearn: {knn_sk.score(X_c[200:], y_c[200:]):.4f}')
LogisticRegression — manual: 0.9233, sklearn: 0.9233
KNN — manual: 0.9600, sklearn: 0.9600

5. Stage 4 — Pipeline and Cross-Validator

class Pipeline:
    """
    Chain of (name, transformer/estimator) steps.
    All steps except the last must implement fit/transform.
    The last step must implement fit/predict/score.
    """

    def __init__(self, steps: list[tuple[str, object]]):
        self.steps = steps

    @property
    def _estimator(self):
        return self.steps[-1][1]

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'Pipeline':
        X_t = X
        for _, step in self.steps[:-1]:
            X_t = step.fit_transform(X_t)
        self._estimator.fit(X_t, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        X_t = X
        for _, step in self.steps[:-1]:
            X_t = step.transform(X_t)
        return self._estimator.predict(X_t)

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        return self._estimator.score(self.predict(X).reshape(-1,1) if False else
                                     np.zeros((0,)),  # placeholder
                                     y) if False else float((self.predict(X) == y).mean())


class CrossValidator:
    """
    Stratified k-fold cross-validation for any estimator with fit/predict/score.
    """

    def __init__(self, k: int = 5, rng = None):
        self.k   = k
        self.rng = rng or np.random.default_rng()

    def _stratified_folds(
        self, y: np.ndarray
    ) -> list[tuple[np.ndarray, np.ndarray]]:
        classes = np.unique(y)
        class_idx = [self.rng.permutation(np.where(y == c)[0]) for c in classes]
        folds_per = [np.array_split(idx, self.k) for idx in class_idx]
        folds = []
        for i in range(self.k):
            val   = np.concatenate([f[i] for f in folds_per])
            train = np.concatenate([
                np.concatenate([f[j] for j in range(self.k) if j != i])
                for f in folds_per
            ])
            folds.append((train, val))
        return folds

    def score(
        self,
        pipeline_fn,    # callable() -> pipeline instance
        X: np.ndarray,
        y: np.ndarray,
        metric_fn = None,
    ) -> tuple[float, float]:
        """Returns (mean_score, std_score)."""
        folds = self._stratified_folds(y)
        scores = []
        for train_idx, val_idx in folds:
            p = pipeline_fn()
            p.fit(X[train_idx], y[train_idx])
            if metric_fn:
                scores.append(metric_fn(y[val_idx], p.predict(X[val_idx])))
            else:
                scores.append(p.score(X[val_idx], y[val_idx]))
        return float(np.mean(scores)), float(np.std(scores))


# End-to-end test
def make_pipeline():
    return Pipeline([
        ('scaler', StandardScaler()),
        ('clf',    LogisticRegression(lr=0.5, n_iter=200, C=1.0, rng=rng)),
    ])

cv = CrossValidator(k=5, rng=rng)
mean_acc, std_acc = cv.score(make_pipeline, X_c, y_c)
print(f'Pipeline CV accuracy: {mean_acc:.4f} ± {std_acc:.4f}')
Pipeline CV accuracy: 0.9198 ± 0.0252

6. Results & Reflection

What was built: A mini ML library with:

  • StandardScaler, MinMaxScaler — preprocessing

  • LinearRegression, RidgeRegression — regression

  • LogisticRegression, KNearestNeighbors — classification

  • Pipeline — chained transforms + estimator

  • CrossValidator — stratified k-fold evaluation

What math made it possible: The entire library is linear algebra (matrix products for forward passes), calculus (gradient updates), and probability (the logistic model’s probabilistic interpretation). Building from scratch reveals that sklearn’s convenience functions are thin wrappers over the same mathematics.

Extension challenges:

  1. Add a DecisionStump classifier (split on a single threshold). Use it to implement AdaBoost from ch296 principles.

  2. Add predict_proba to KNN and compute AUC-ROC. How does it compare to LogisticRegression?

  3. Implement GridSearchCV that exhaustively searches over a hyperparameter grid using CrossValidator. Validate it finds the same optimal parameters as sklearn’s GridSearchCV.