diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6bae019 --- /dev/null +++ b/.env.example @@ -0,0 +1,2 @@ +MODEL__INPUT_FEATURES=300 +DATA__TRAIN_PATH=/path/to/data/mnist_train.csv diff --git a/.gitignore b/.gitignore index 4550c23..6530e6e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,3 @@ -data/ - outputs # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/ml_pipeline/__init__.py b/ml_pipeline/__init__.py index e69de29..af6cf0d 100644 --- a/ml_pipeline/__init__.py +++ b/ml_pipeline/__init__.py @@ -0,0 +1,4 @@ +from .config import config + +config = config() + diff --git a/ml_pipeline/cli.py b/ml_pipeline/cli.py index e4324d5..084b021 100644 --- a/ml_pipeline/cli.py +++ b/ml_pipeline/cli.py @@ -6,3 +6,16 @@ def cli(): """ ml_pipeline: a template for building, training and running pytorch models. """ + + +@cli.command("train") +def train(): + """run the training pipeline with train data""" + from ml_pipeline.training.pipeline import run + run() + +@cli.command("evaluate") +def evaluate(): + """run the training pipeline with test data""" + from ml_pipeline.training.pipeline import run + run(evaluate=True) diff --git a/ml_pipeline/common.py b/ml_pipeline/common.py index 73dcef7..e69de29 100644 --- a/ml_pipeline/common.py +++ b/ml_pipeline/common.py @@ -1,7 +0,0 @@ -from enum import Enum, auto - - -class Stage(Enum): - TRAIN = auto() - DEV = auto() - TEST = auto() diff --git a/ml_pipeline/config/__init__.py b/ml_pipeline/config/__init__.py new file mode 100644 index 0000000..918197c --- /dev/null +++ b/ml_pipeline/config/__init__.py @@ -0,0 +1,14 @@ +from config import ConfigurationSet, config_from_env, config_from_dotenv, config_from_toml +from pathlib import Path + +def config(): + config = Path(__file__).parent + root = config.parent.parent + return ConfigurationSet( + config_from_env(prefix="ML_PIPELINE", separator="__", lowercase_keys=True), + config_from_dotenv(root / ".env", read_from_file=True, lowercase_keys=True, interpolate=True, interpolate_type=1), + config_from_toml(config / "training.toml", read_from_file=True), + config_from_toml(config / "data.toml", read_from_file=True), + config_from_toml(config / "model.toml", read_from_file=True), + ) + diff --git a/ml_pipeline/config/config.toml b/ml_pipeline/config/config.toml new file mode 100644 index 0000000..e69de29 diff --git a/ml_pipeline/config/data.toml b/ml_pipeline/config/data.toml new file mode 100644 index 0000000..2548a08 --- /dev/null +++ b/ml_pipeline/config/data.toml @@ -0,0 +1,4 @@ +[data] +train_path = "/path/to/data/mnist_train.csv" +in_channels = 1 +num_classes = 10 diff --git a/ml_pipeline/config/model.toml b/ml_pipeline/config/model.toml new file mode 100644 index 0000000..e8b3788 --- /dev/null +++ b/ml_pipeline/config/model.toml @@ -0,0 +1,3 @@ +[model] +hidden_size = 8 +name = 'vgg11' diff --git a/ml_pipeline/config/training.toml b/ml_pipeline/config/training.toml new file mode 100644 index 0000000..e6e773e --- /dev/null +++ b/ml_pipeline/config/training.toml @@ -0,0 +1,8 @@ +[training] +batch_size = 16 +epochs = 10 +learning_rate = 0.01 +device = 'cpu' +# examples = 50 +examples = -1 + diff --git a/ml_pipeline/data.py b/ml_pipeline/data.py deleted file mode 100644 index 37edd33..0000000 --- a/ml_pipeline/data.py +++ /dev/null @@ -1,86 +0,0 @@ -from torch.utils.data import Dataset -import numpy as np -import einops -import csv -import torch -import click - - -SAMPLES = 500 -IN_DIM = 30 -OUT_DIM = 20 - - -class GenericDataset(Dataset): - def __init__(self): - rng = np.random.default_rng() - self.x = rng.normal(size=(SAMPLES, IN_DIM)).astype(np.float32) - self.y = 500 * rng.normal(size=(SAMPLES, OUT_DIM)).astype(np.float32) - - def __getitem__(self, idx): - return (self.x[idx], self.y[idx]) - - def __len__(self): - return len(self.x) - - def get_in_out_size(self): - return self.x.shape[1], self.y.shape[1] - - -class FashionDataset(Dataset): - def __init__(self, path: str): - self.path = path - self.x, self.y = self.load() - - def __getitem__(self, idx): - return (self.x[idx], self.y[idx]) - - def __len__(self): - return len(self.x) - - def load(self): - # opening the CSV file - with open(self.path, mode="r") as file: - images = list() - classes = list() - # reading the CSV file - csvFile = csv.reader(file) - # displaying the contents of the CSV file - header = next(csvFile) - limit = 1000 - for line in csvFile: - if limit < 1: - break - classes.append(int(line[:1][0])) - images.append([int(x) for x in line[1:]]) - limit -= 1 - classes = torch.tensor(classes, dtype=torch.long) - images = torch.tensor(images, dtype=torch.float32) - images = einops.rearrange(images, "n (w h) -> n w h", w=28, h=28) - images = einops.repeat( - images, "n w h -> n c (w r_w) (h r_h)", c=1, r_w=8, r_h=8 - ) - return (images, classes) - - -@click.group() -def cli(): ... - - -@cli.command() -def main(): - path = "fashion-mnist_train.csv" - dataset = FashionDataset(path=path) - print(f"len: {len(dataset)}") - print(f"first shape: {dataset[0][0].shape}") - mean = einops.reduce(dataset[:10], "n w h -> w h", "mean") - print(f"mean shape: {mean.shape}") - - -@cli.command() -def generic(): - dataset = GenericDataset() - - -if __name__ == "__main__": - cli() diff --git a/ml_pipeline/data/dataset.py b/ml_pipeline/data/dataset.py new file mode 100644 index 0000000..673c928 --- /dev/null +++ b/ml_pipeline/data/dataset.py @@ -0,0 +1,68 @@ +from torch.utils.data import Dataset +import numpy as np +import einops +import csv +import torch +from pathlib import Path +from typing import Tuple +from ml_pipeline import config + + +class MnistDataset(Dataset): + """ + The MNIST database of handwritten digits. + Training set is 60k labeled examples, test is 10k examples. + The b/w images normalized to 20x20, preserving aspect ratio. + + It's the defacto standard image training set to learn about classification in DL + """ + + def __init__(self, path: Path): + """ + give a path to a dir that contains the following csv files: + https://pjreddie.com/projects/mnist-in-csv/ + """ + assert path, "dataset path required" + self.path = Path(path) + assert self.path.exists(), f"could not find dataset path: {path}" + self.features, self.labels = self._load() + + def __getitem__(self, idx): + return (self.features[idx], self.labels[idx]) + + def __len__(self): + return len(self.features) + + def _load(self) -> Tuple[torch.Tensor, torch.Tensor]: + # opening the CSV file + with open(self.path, mode="r") as file: + images, labels = [], [] + csvFile = csv.reader(file) + examples = config.training.examples + for line, content in enumerate(csvFile): + if line == examples: + break + labels.append(int(content[0])) + image = [int(x) for x in content[1:]] + images.append(image) + labels = torch.tensor(labels, dtype=torch.int64) + images = torch.tensor(images, dtype=torch.float32) + images = einops.rearrange(images, "n (w h) -> n w h", w=28, h=28) + images = einops.repeat( + images, "n w h -> n c (w r_w) (h r_h)", c=1, r_w=8, r_h=8 + ) + return (images, labels) + + +def main(): + path = Path("storage/mnist_train.csv") + dataset = MnistDataset(path=path) + print(f"len: {len(dataset)}") + print(f"first shape: {dataset[0][0].shape}") + mean = einops.reduce(dataset[:10][0], "n w h -> w h", "mean") + print(f"mean shape: {mean.shape}") + print(f"mean image: {mean}") + + +if __name__ == "__main__": + main() diff --git a/ml_pipeline/data/spark.py b/ml_pipeline/data/spark.py new file mode 100644 index 0000000..7e082cc --- /dev/null +++ b/ml_pipeline/data/spark.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +from sys import stdout +import csv + +# 'pip install pyspark' for these +from pyspark import SparkFiles +from pyspark.sql import SparkSession + +# make a spark "session". this creates a local hadoop cluster by default (!) +spark = SparkSession.builder.getOrCreate() +# put the input file in the cluster's filesystem: +spark.sparkContext.addFile("https://csvbase.com/meripaterson/stock-exchanges.csv") +# the following is much like for pandas +df = ( + spark.read.csv(f"file://{SparkFiles.get('stock-exchanges.csv')}", header=True) + .select("MIC") + .na.drop() + .sort("MIC") +) +# pyspark has no easy way to write csv to stdout - use python's csv lib +csv.writer(stdout).writerows(df.collect()) diff --git a/ml_pipeline/notebooks/features.ipynb b/ml_pipeline/notebooks/features.ipynb new file mode 100644 index 0000000..cf1aee0 --- /dev/null +++ b/ml_pipeline/notebooks/features.ipynb @@ -0,0 +1,23 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "634a9940-7cda-4fe3-bd68-cd69c7db199d", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "", + "name": "" + }, + "language_info": { + "name": "" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ml_pipeline/notebooks/main.ipynb b/ml_pipeline/notebooks/main.ipynb new file mode 100644 index 0000000..16c4c9e --- /dev/null +++ b/ml_pipeline/notebooks/main.ipynb @@ -0,0 +1,85 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "id": "9f86d9e7-ca94-4dce-b86d-7ddb261f4e25", + "metadata": {}, + "outputs": [], + "source": [ + "# Now you can import your package\n", + "import ml_pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "6ba8b629-82db-487f-acbf-2ca20feee7e2", + "metadata": {}, + "outputs": [], + "source": [ + "from ml_pipeline.data.dataset import MnistDataset" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "8fb6c881-46ba-40e5-b837-c507c5bfae21", + "metadata": {}, + "outputs": [], + "source": [ + "from ml_pipeline import config" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "c8ce7920-c056-44ac-93df-b25bae870592", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "config" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83293ef7-37b3-452f-8de5-13bee633d099", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ml_pipeline/runner.py b/ml_pipeline/runner.py deleted file mode 100644 index 3bc726d..0000000 --- a/ml_pipeline/runner.py +++ /dev/null @@ -1,43 +0,0 @@ -from torch import nn - - -class Runner: - """Runner class that is in charge of implementing routine training functions such as running epochs or doing inference time""" - - def __init__(self, train_set, train_loader, accelerator, model, optimizer): - # Initialize class attributes - self.accelerator = accelerator - self.train_set = train_set - - # Prepare opt, model, and train_loader (helps accelerator auto-cast to devices) - self.optimizer, self.model, self.train_loader = accelerator.prepare( - optimizer, model, train_loader - ) - - # Since data is for targets, use Mean Squared Error Loss - self.criterion = nn.MSELoss() - - def step(self): - """Runs an epoch of training. - - Includes updating model weights and tracking training loss - - Returns: - float: The loss averaged over the entire epoch - """ - - # Turn the model to training mode (affects batchnorm and dropout) - self.model.train() - - running_loss, running_sample_count = 0.0, 0.0 - for sample, target in self.train_loader: - self.optimizer.zero_grad() # Reset gradients to 0 - prediction = self.model(sample) # Forward pass through model - loss = self.criterion(prediction, target) # Error calculation - running_loss += loss # Increment running loss - running_sample_count += len(sample) - self.accelerator.backward( - loss - ) # Increment gradients within model by sending loss backwards - self.optimizer.step() # Update model weights - yield running_loss / running_sample_count # Take the average of the loss over each sample diff --git a/ml_pipeline/train.py b/ml_pipeline/train.py deleted file mode 100644 index 0618709..0000000 --- a/ml_pipeline/train.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -main class for building a DL pipeline. - -""" - -from accelerate import Accelerator -from torch.utils.data import DataLoader -from torch.optim import AdamW -from ml_pipeline.data import GenericDataset -from ml_pipeline.model.linear import DNN -from ml_pipeline.runner import Runner -import click - - -@click.group() -def cli(): - pass - - -@cli.command() -def train(): - # Initialize hyperparameters - hidden_size = 128 - epochs = 10 - batch_size = 10 - lr = 0.001 - - # Accelerator is in charge of auto casting tensors to the appropriate GPU device - accelerator = Accelerator() - - # Initialize the training set and a dataloader to iterate over the dataset - train_set = GenericDataset() - train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) - - # Get the size of the input and output vectors from the training set - in_features, out_features = train_set.get_in_out_size() - - # Create the model and optimizer and cast model to the appropriate GPU - model = DNN(in_features, hidden_size, out_features).to(accelerator.device) - optimizer = AdamW(model.parameters(), lr=lr) - - # Create a runner that will handle - runner = Runner( - train_set=train_set, - train_loader=train_loader, - accelerator=accelerator, - model=model, - optimizer=optimizer, - ) - - # Train the model - for _ in range(epochs): - # Run one loop of training and record the average loss - for step in runner.step(): - print(f"{step}") - - -if __name__ == "__main__": - cli() diff --git a/ml_pipeline/training/__init__.py b/ml_pipeline/training/__init__.py new file mode 100644 index 0000000..f0fad35 --- /dev/null +++ b/ml_pipeline/training/__init__.py @@ -0,0 +1,12 @@ +""" +main class for building a DL pipeline. + +""" +from enum import Enum, auto + + +class Stage(Enum): + TRAIN = auto() + DEV = auto() + TEST = auto() + diff --git a/ml_pipeline/training/pipeline.py b/ml_pipeline/training/pipeline.py new file mode 100644 index 0000000..ac86f6a --- /dev/null +++ b/ml_pipeline/training/pipeline.py @@ -0,0 +1,59 @@ + +from torch.utils.data import DataLoader +from torch.optim import AdamW +from ml_pipeline.training.runner import Runner +from ml_pipeline import config + + +def run(): + # Initialize the training set and a dataloader to iterate over the dataset + # train_set = GenericDataset() + train_set = get_dataset() + train_loader = DataLoader(train_set, batch_size=config.training.batch_size, shuffle=True) + + model = get_model(name=config.model.name) + + # Get the size of the input and output vectors from the training set + # in_features, out_features = train_set.get_in_out_size() + + + optimizer = AdamW(model.parameters(), lr=config.training.learning_rate) + + # Create a runner that will handle + runner = Runner( + train_set=train_set, + train_loader=train_loader, + model=model, + optimizer=optimizer, + ) + + # Train the model + for _ in range(config.training.epochs): + # Run one loop of training and record the average loss + for step in runner.step(): + print(f"{step}") + +def get_model(name='vgg11'): + from ml_pipeline.model.linear import DNN + from ml_pipeline.model.cnn import VGG11 + if name == 'vgg11': + return VGG11(config.data.in_channels, config.data.num_classes) + else: + # Create the model and optimizer and cast model to the appropriate GPU + in_features, out_features = dataset.in_out_features() + model = DNN(in_features, config.model.hidden_size, out_features) + return model.to(config.training.device) + + +def get_dataset(source='mnist', split='train'): + # Usage + from ml_pipeline.data.dataset import MnistDataset + from torchvision import transforms + csv_file_path = config.data.train_path + transform = transforms.Compose([ + transforms.ToTensor(), # Converts a PIL Image or numpy.ndarray to a FloatTensor and scales the image's pixel intensity values to the [0., 1.] range + transforms.Normalize((0.1307,), (0.3081,)) # Normalize using the mean and std specific to MNIST + ]) + + dataset = MnistDataset(csv_file_path) + return dataset diff --git a/ml_pipeline/training/runner.py b/ml_pipeline/training/runner.py new file mode 100644 index 0000000..870c3d0 --- /dev/null +++ b/ml_pipeline/training/runner.py @@ -0,0 +1,46 @@ +from torch import nn +from torch.utils.data import Dataset, DataLoader +from torch.optim import Optimizer + + +class Runner: + """Runner class that is in charge of implementing routine training functions such as running epochs or doing inference time""" + + def __init__(self, train_set: Dataset, train_loader: DataLoader, model: nn.Module, optimizer: Optimizer): + # Initialize class attributes + self.train_set = train_set + + # Prepare opt, model, and train_loader (helps accelerator auto-cast to devices) + self.optimizer, self.model, self.train_loader = ( + optimizer, model, train_loader + ) + + # Since data is for targets, use Mean Squared Error Loss + # self.criterion = nn.MSELoss() + self.criterion = nn.CrossEntropyLoss() + + def step(self): + """Runs an epoch of training. + + Includes updating model weights and tracking training loss + + Returns: + float: The loss averaged over the entire epoch + """ + + # turn the model to training mode (affects batchnorm and dropout) + self.model.train() + + total_loss, total_samples = 0.0, 0.0 + for sample, target in self.train_loader: + self.optimizer.zero_grad() # reset gradients to 0 + prediction = self.model(sample) # forward pass through model + loss = self.criterion(prediction, target) # error calculation + + # increment gradients within model by sending loss backwards + loss.backward() + self.optimizer.step() # update model weights + + total_loss += loss # increment running loss + total_samples += len(sample) + yield total_loss / total_samples # take the average of the loss over each sample diff --git a/pyproject.toml b/pyproject.toml index 524d7a7..9fda749 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,11 +17,16 @@ dependencies = [ "matplotlib==3.8.4", "numpy==1.26.4", "pytest==8.1.1", + "pytest-cov==5.0.0", "python-dotenv==1.0.1", "requests==2.31.0", "torch==2.2.2", + "torchvision=0.17.2", "tqdm==4.66.2", "wandb==0.16.6", + "python-configuration[toml]", + "pandas==2.2.1", + "notebook==7.1.2", ] [project.urls] @@ -31,3 +36,33 @@ documentation = "https://example.com/my_project/docs" [tool.setuptools] packages = ["ml_pipeline"] + +[tool.pytest.ini_options] +# Run tests in parallel using pytest-xdist +addopts = "--cov=ml_pipeline --cov-report=term" +# Specify the paths to look for tests +testpaths = [ + "test", +] +# Set default Python classes, functions, and methods to consider as tests +python_files = [ + "test_*.py", + "test*.py", + "*_test.py", +] +python_classes = [ + "Test*", + "*Test", + "*Tests", + "*TestCase", +] +python_functions = [ + "test_*", + "*_test", +] + +# Configure markers (custom or otherwise) +markers = [ + "slow: marks tests as slow (deselect with '-m \"not slow\"')", + "online: marks tests that require internet access", +] diff --git a/test/conftest.py b/test/conftest.py deleted file mode 100644 index fc788d0..0000000 --- a/test/conftest.py +++ /dev/null @@ -1,20 +0,0 @@ -# conftest.py -import pytest -import os -from dotenv import load_dotenv -from pathlib import Path - - -@pytest.fixture(autouse=True) -def load_env(): - # Set up your environment variables here - env = Path(__file__).parent / ".env.test" - if not load_dotenv(env): - raise RuntimeError(".env not loaded") - # os.environ['MY_ENV_VAR'] = 'some_value' - # You can add more setup code here if needed - - yield - - # Optional: Cleanup code after test (if needed) - # e.g., unset environment variables if they should not persist after test diff --git a/test/test_cnn.py b/test/test_cnn.py new file mode 100644 index 0000000..df2073c --- /dev/null +++ b/test/test_cnn.py @@ -0,0 +1,6 @@ +from ml_pipeline import config +from ml_pipeline.model.cnn import VGG11 + +def test_in_channels(): + assert config.model.name == 'vgg11' + diff --git a/test/test_inputs.py b/test/test_inputs.py new file mode 100644 index 0000000..bfe30e6 --- /dev/null +++ b/test/test_inputs.py @@ -0,0 +1,28 @@ +from ml_pipeline.data.dataset import MnistDataset +from ml_pipeline import config +from pathlib import Path +import pytest + +@pytest.mark.skip() +def test_init(): + pass + + +def test_getitem(): + train_set = MnistDataset(config.data.train_path) + + assert train_set[0][1].item() == 5 + repeated = 8 + length = 28 + channels = 1 + assert train_set[0][0].shape == (channels, length * repeated, length * repeated) + +@pytest.mark.skip() +def test_loader(): + from torch.utils.data import DataLoader + train_set = MnistDataset(config.data.train_path) + # train_loader = DataLoader(train_set, batch_size=config.training.batch_size, shuffle=True) + # for sample, target in train_loader: + # assert len(sample) == config.training.batch_size + # len(sample) + # len(target) diff --git a/test/test_pipeline.py b/test/test_pipeline.py deleted file mode 100644 index 9ffb306..0000000 --- a/test/test_pipeline.py +++ /dev/null @@ -1,17 +0,0 @@ -from src.model.linear import DNN -from src.data.dataset import MnistDataset -import os - - -def test_size_of_dataset(): - examples = 500 - os.environ["TRAINING_EXAMPLES"] = str(examples) - channels = 1 - width, height = 224, 224 - dataset = MnistDataset(os.getenv("TRAIN_PATH")) - # label = dataset[0][1].item() - image = dataset[0][0].shape - assert channels == image[0] - assert width == image[1] - assert height == image[2] - assert len(dataset) == examples