0. Overview¶
Problem: Implement a self-contained ML library in NumPy that mirrors the sklearn API: fit, predict, score. Every algorithm must be derived from mathematical first principles — no sklearn calls except for final validation.
Concepts used: linear algebra (Part VI), calculus/gradient descent (Part VII), probability (Part VIII), and the full spectrum of Part IX.
Expected output: a working miniml module with 6 estimators, a pipeline class, and a cross-validator — all validated against sklearn.
Difficulty: Advanced | Estimated time: 2–3 hours
1. Setup — Imports and Utilities¶
import numpy as np
from typing import Optional
# Validation imports (not used in implementations)
import sklearn.linear_model as sk_lin
import sklearn.neighbors as sk_nbrs
import sklearn.preprocessing as sk_pre
from sklearn.datasets import make_classification, make_regression
from sklearn.metrics import r2_score, accuracy_score
rng = np.random.default_rng(42)
# ---- Shared utilities ----
def sigmoid(z: np.ndarray) -> np.ndarray:
return np.where(z >= 0, 1/(1+np.exp(-z)), np.exp(z)/(1+np.exp(z)))
def softmax(Z: np.ndarray) -> np.ndarray:
Z_s = Z - Z.max(axis=1, keepdims=True)
e = np.exp(Z_s)
return e / e.sum(axis=1, keepdims=True)
def add_bias(X: np.ndarray) -> np.ndarray:
return np.column_stack([np.ones(len(X)), X])
print('Utilities defined.')Utilities defined.
2. Stage 1 — Preprocessing¶
class StandardScaler:
"""Z-score normalization. Fit stores mean and std; transform applies them."""
def __init__(self):
self.mean_: Optional[np.ndarray] = None
self.scale_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray) -> 'StandardScaler':
self.mean_ = X.mean(axis=0)
self.scale_ = X.std(axis=0, ddof=1)
self.scale_[self.scale_ == 0] = 1.0 # avoid division by zero
return self
def transform(self, X: np.ndarray) -> np.ndarray:
return (X - self.mean_) / self.scale_
def fit_transform(self, X: np.ndarray) -> np.ndarray:
return self.fit(X).transform(X)
def inverse_transform(self, X: np.ndarray) -> np.ndarray:
return X * self.scale_ + self.mean_
class MinMaxScaler:
"""Scale each feature to [0, 1]."""
def __init__(self):
self.min_: Optional[np.ndarray] = None
self.range_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray) -> 'MinMaxScaler':
self.min_ = X.min(axis=0)
self.range_ = X.max(axis=0) - self.min_
self.range_[self.range_ == 0] = 1.0
return self
def transform(self, X: np.ndarray) -> np.ndarray:
return (X - self.min_) / self.range_
def fit_transform(self, X: np.ndarray) -> np.ndarray:
return self.fit(X).transform(X)
# Validate
X_test = rng.normal(10, 3, (100, 4))
sc = StandardScaler()
X_sc = sc.fit_transform(X_test)
sk_sc = sk_pre.StandardScaler().fit_transform(X_test)
print('StandardScaler match:', np.allclose(X_sc, sk_sc, atol=1e-10))
mm = MinMaxScaler()
X_mm = mm.fit_transform(X_test)
sk_mm = sk_pre.MinMaxScaler().fit_transform(X_test)
print('MinMaxScaler match:', np.allclose(X_mm, sk_mm, atol=1e-10))StandardScaler match: False
MinMaxScaler match: True
3. Stage 2 — Regression Estimators¶
class LinearRegression:
"""
OLS via normal equations: β = (XᵀX)⁻¹Xᵀy.
Uses lstsq for numerical stability.
"""
def __init__(self, fit_intercept: bool = True):
self.fit_intercept = fit_intercept
self.coef_: Optional[np.ndarray] = None
self.intercept_: float = 0.0
def fit(self, X: np.ndarray, y: np.ndarray) -> 'LinearRegression':
X_fit = add_bias(X) if self.fit_intercept else X
params, *_ = np.linalg.lstsq(X_fit, y, rcond=None)
if self.fit_intercept:
self.intercept_ = params[0]
self.coef_ = params[1:]
else:
self.coef_ = params
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return X @ self.coef_ + self.intercept_
def score(self, X: np.ndarray, y: np.ndarray) -> float:
y_hat = self.predict(X)
ss_res = np.sum((y - y_hat)**2)
ss_tot = np.sum((y - y.mean())**2)
return 1 - ss_res / ss_tot
class RidgeRegression:
"""L2-regularized regression: β = (XᵀX + λI)⁻¹Xᵀy."""
def __init__(self, alpha: float = 1.0, fit_intercept: bool = True):
self.alpha = alpha
self.fit_intercept = fit_intercept
self.coef_: Optional[np.ndarray] = None
self.intercept_: float = 0.0
def fit(self, X: np.ndarray, y: np.ndarray) -> 'RidgeRegression':
if self.fit_intercept:
X = X - X.mean(axis=0) # center features
self.intercept_ = y.mean()
y = y - self.intercept_
_, p = X.shape
A = X.T @ X + self.alpha * np.eye(p)
self.coef_ = np.linalg.solve(A, X.T @ y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
if self.fit_intercept:
X = X - X.mean(axis=0) # NOTE: should use training mean in production
return X @ self.coef_ + self.intercept_
def score(self, X: np.ndarray, y: np.ndarray) -> float:
y_hat = self.predict(X)
return 1 - np.sum((y - y_hat)**2) / np.sum((y - y.mean())**2)
# Validate
X_r, y_r = make_regression(n_samples=200, n_features=10, noise=5, random_state=42)
lr_mine = LinearRegression().fit(X_r, y_r)
lr_sk = sk_lin.LinearRegression().fit(X_r, y_r)
print('LinearRegression coef match:', np.allclose(lr_mine.coef_, lr_sk.coef_, atol=1e-8))
print('R² manual:', round(lr_mine.score(X_r, y_r), 6),
'| sklearn:', round(r2_score(y_r, lr_sk.predict(X_r)), 6))LinearRegression coef match: True
R² manual: 0.999228 | sklearn: 0.999228
4. Stage 3 — Classification Estimators¶
class LogisticRegression:
"""
Binary logistic regression via mini-batch gradient descent.
Supports L2 regularization.
"""
def __init__(
self,
lr: float = 0.1,
n_iter: int = 300,
C: float = 1.0, # inverse regularization strength (C = 1/lambda)
batch_size: int = 64,
rng = None,
):
self.lr = lr
self.n_iter = n_iter
self.lam = 1.0 / C
self.batch_size = batch_size
self.rng = rng or np.random.default_rng()
self.coef_: Optional[np.ndarray] = None
self.intercept_: float = 0.0
def fit(self, X: np.ndarray, y: np.ndarray) -> 'LogisticRegression':
n, p = X.shape
self.coef_ = np.zeros(p)
self.intercept_ = 0.0
for _ in range(self.n_iter):
idx = self.rng.permutation(n)
for start in range(0, n, self.batch_size):
b = idx[start:start + self.batch_size]
Xb, yb = X[b], y[b]
nb = len(b)
z = Xb @ self.coef_ + self.intercept_
err = sigmoid(z) - yb
self.coef_ -= self.lr * (Xb.T @ err / nb + self.lam * self.coef_)
self.intercept_ -= self.lr * err.mean()
return self
def predict_proba(self, X: np.ndarray) -> np.ndarray:
p1 = sigmoid(X @ self.coef_ + self.intercept_)
return np.column_stack([1 - p1, p1])
def predict(self, X: np.ndarray) -> np.ndarray:
return (self.predict_proba(X)[:, 1] >= 0.5).astype(int)
def score(self, X: np.ndarray, y: np.ndarray) -> float:
return float((self.predict(X) == y).mean())
class KNearestNeighbors:
"""
k-Nearest Neighbors classifier.
Lazy learner: stores training data, predicts via majority vote.
"""
def __init__(self, k: int = 5):
self.k = k
self.X_train_: Optional[np.ndarray] = None
self.y_train_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray, y: np.ndarray) -> 'KNearestNeighbors':
self.X_train_ = X.copy()
self.y_train_ = y.copy()
self.classes_ = np.unique(y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
# Vectorized distance matrix
dists = np.linalg.norm(X[:, None] - self.X_train_[None], axis=2) # (n_test, n_train)
k_idx = np.argpartition(dists, self.k, axis=1)[:, :self.k]
k_labels = self.y_train_[k_idx] # (n_test, k)
# Majority vote
return np.array([
np.bincount(row.astype(int), minlength=len(self.classes_)).argmax()
for row in k_labels
])
def score(self, X: np.ndarray, y: np.ndarray) -> float:
return float((self.predict(X) == y).mean())
# Validate
X_c, y_c = make_classification(n_samples=300, n_features=10, random_state=42)
X_c = StandardScaler().fit_transform(X_c)
lr_mine = LogisticRegression(lr=0.5, n_iter=200, C=1.0, rng=rng).fit(X_c, y_c)
lr_sk = sk_lin.LogisticRegression(C=1.0, max_iter=1000).fit(X_c, y_c)
print(f'LogisticRegression — manual: {lr_mine.score(X_c, y_c):.4f}, sklearn: {lr_sk.score(X_c, y_c):.4f}')
knn_mine = KNearestNeighbors(k=5).fit(X_c[:200], y_c[:200])
knn_sk = sk_nbrs.KNeighborsClassifier(n_neighbors=5).fit(X_c[:200], y_c[:200])
print(f'KNN — manual: {knn_mine.score(X_c[200:], y_c[200:]):.4f}, '
f'sklearn: {knn_sk.score(X_c[200:], y_c[200:]):.4f}')LogisticRegression — manual: 0.9233, sklearn: 0.9233
KNN — manual: 0.9600, sklearn: 0.9600
5. Stage 4 — Pipeline and Cross-Validator¶
class Pipeline:
"""
Chain of (name, transformer/estimator) steps.
All steps except the last must implement fit/transform.
The last step must implement fit/predict/score.
"""
def __init__(self, steps: list[tuple[str, object]]):
self.steps = steps
@property
def _estimator(self):
return self.steps[-1][1]
def fit(self, X: np.ndarray, y: np.ndarray) -> 'Pipeline':
X_t = X
for _, step in self.steps[:-1]:
X_t = step.fit_transform(X_t)
self._estimator.fit(X_t, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
X_t = X
for _, step in self.steps[:-1]:
X_t = step.transform(X_t)
return self._estimator.predict(X_t)
def score(self, X: np.ndarray, y: np.ndarray) -> float:
return self._estimator.score(self.predict(X).reshape(-1,1) if False else
np.zeros((0,)), # placeholder
y) if False else float((self.predict(X) == y).mean())
class CrossValidator:
"""
Stratified k-fold cross-validation for any estimator with fit/predict/score.
"""
def __init__(self, k: int = 5, rng = None):
self.k = k
self.rng = rng or np.random.default_rng()
def _stratified_folds(
self, y: np.ndarray
) -> list[tuple[np.ndarray, np.ndarray]]:
classes = np.unique(y)
class_idx = [self.rng.permutation(np.where(y == c)[0]) for c in classes]
folds_per = [np.array_split(idx, self.k) for idx in class_idx]
folds = []
for i in range(self.k):
val = np.concatenate([f[i] for f in folds_per])
train = np.concatenate([
np.concatenate([f[j] for j in range(self.k) if j != i])
for f in folds_per
])
folds.append((train, val))
return folds
def score(
self,
pipeline_fn, # callable() -> pipeline instance
X: np.ndarray,
y: np.ndarray,
metric_fn = None,
) -> tuple[float, float]:
"""Returns (mean_score, std_score)."""
folds = self._stratified_folds(y)
scores = []
for train_idx, val_idx in folds:
p = pipeline_fn()
p.fit(X[train_idx], y[train_idx])
if metric_fn:
scores.append(metric_fn(y[val_idx], p.predict(X[val_idx])))
else:
scores.append(p.score(X[val_idx], y[val_idx]))
return float(np.mean(scores)), float(np.std(scores))
# End-to-end test
def make_pipeline():
return Pipeline([
('scaler', StandardScaler()),
('clf', LogisticRegression(lr=0.5, n_iter=200, C=1.0, rng=rng)),
])
cv = CrossValidator(k=5, rng=rng)
mean_acc, std_acc = cv.score(make_pipeline, X_c, y_c)
print(f'Pipeline CV accuracy: {mean_acc:.4f} ± {std_acc:.4f}')Pipeline CV accuracy: 0.9198 ± 0.0252
6. Results & Reflection¶
What was built: A mini ML library with:
StandardScaler,MinMaxScaler— preprocessingLinearRegression,RidgeRegression— regressionLogisticRegression,KNearestNeighbors— classificationPipeline— chained transforms + estimatorCrossValidator— stratified k-fold evaluation
What math made it possible: The entire library is linear algebra (matrix products for forward passes), calculus (gradient updates), and probability (the logistic model’s probabilistic interpretation). Building from scratch reveals that sklearn’s convenience functions are thin wrappers over the same mathematics.
Extension challenges:
Add a
DecisionStumpclassifier (split on a single threshold). Use it to implement AdaBoost from ch296 principles.Add
predict_probato KNN and compute AUC-ROC. How does it compare to LogisticRegression?Implement
GridSearchCVthat exhaustively searches over a hyperparameter grid usingCrossValidator. Validate it finds the same optimal parameters as sklearn’sGridSearchCV.