refactor files.

add a notebook module.
add config package.
This commit is contained in:
publicmatt 2024-04-05 18:37:24 -07:00
parent e508efefee
commit bae586612e
25 changed files with 431 additions and 234 deletions

2
.env.example Normal file
View File

@ -0,0 +1,2 @@
MODEL__INPUT_FEATURES=300
DATA__TRAIN_PATH=/path/to/data/mnist_train.csv

2
.gitignore vendored
View File

@ -1,5 +1,3 @@
data/
outputs outputs
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -0,0 +1,4 @@
from .config import config
config = config()

View File

@ -6,3 +6,16 @@ def cli():
""" """
ml_pipeline: a template for building, training and running pytorch models. ml_pipeline: a template for building, training and running pytorch models.
""" """
@cli.command("train")
def train():
"""run the training pipeline with train data"""
from ml_pipeline.training.pipeline import run
run()
@cli.command("evaluate")
def evaluate():
"""run the training pipeline with test data"""
from ml_pipeline.training.pipeline import run
run(evaluate=True)

View File

@ -1,7 +0,0 @@
from enum import Enum, auto
class Stage(Enum):
TRAIN = auto()
DEV = auto()
TEST = auto()

View File

@ -0,0 +1,14 @@
from config import ConfigurationSet, config_from_env, config_from_dotenv, config_from_toml
from pathlib import Path
def config():
config = Path(__file__).parent
root = config.parent.parent
return ConfigurationSet(
config_from_env(prefix="ML_PIPELINE", separator="__", lowercase_keys=True),
config_from_dotenv(root / ".env", read_from_file=True, lowercase_keys=True, interpolate=True, interpolate_type=1),
config_from_toml(config / "training.toml", read_from_file=True),
config_from_toml(config / "data.toml", read_from_file=True),
config_from_toml(config / "model.toml", read_from_file=True),
)

View File

View File

@ -0,0 +1,4 @@
[data]
train_path = "/path/to/data/mnist_train.csv"
in_channels = 1
num_classes = 10

View File

@ -0,0 +1,3 @@
[model]
hidden_size = 8
name = 'vgg11'

View File

@ -0,0 +1,8 @@
[training]
batch_size = 16
epochs = 10
learning_rate = 0.01
device = 'cpu'
# examples = 50
examples = -1

View File

@ -1,86 +0,0 @@
from torch.utils.data import Dataset
import numpy as np
import einops
import csv
import torch
import click
SAMPLES = 500
IN_DIM = 30
OUT_DIM = 20
class GenericDataset(Dataset):
def __init__(self):
rng = np.random.default_rng()
self.x = rng.normal(size=(SAMPLES, IN_DIM)).astype(np.float32)
self.y = 500 * rng.normal(size=(SAMPLES, OUT_DIM)).astype(np.float32)
def __getitem__(self, idx):
return (self.x[idx], self.y[idx])
def __len__(self):
return len(self.x)
def get_in_out_size(self):
return self.x.shape[1], self.y.shape[1]
class FashionDataset(Dataset):
def __init__(self, path: str):
self.path = path
self.x, self.y = self.load()
def __getitem__(self, idx):
return (self.x[idx], self.y[idx])
def __len__(self):
return len(self.x)
def load(self):
# opening the CSV file
with open(self.path, mode="r") as file:
images = list()
classes = list()
# reading the CSV file
csvFile = csv.reader(file)
# displaying the contents of the CSV file
header = next(csvFile)
limit = 1000
for line in csvFile:
if limit < 1:
break
classes.append(int(line[:1][0]))
images.append([int(x) for x in line[1:]])
limit -= 1
classes = torch.tensor(classes, dtype=torch.long)
images = torch.tensor(images, dtype=torch.float32)
images = einops.rearrange(images, "n (w h) -> n w h", w=28, h=28)
images = einops.repeat(
images, "n w h -> n c (w r_w) (h r_h)", c=1, r_w=8, r_h=8
)
return (images, classes)
@click.group()
def cli(): ...
@cli.command()
def main():
path = "fashion-mnist_train.csv"
dataset = FashionDataset(path=path)
print(f"len: {len(dataset)}")
print(f"first shape: {dataset[0][0].shape}")
mean = einops.reduce(dataset[:10], "n w h -> w h", "mean")
print(f"mean shape: {mean.shape}")
@cli.command()
def generic():
dataset = GenericDataset()
if __name__ == "__main__":
cli()

View File

@ -0,0 +1,68 @@
from torch.utils.data import Dataset
import numpy as np
import einops
import csv
import torch
from pathlib import Path
from typing import Tuple
from ml_pipeline import config
class MnistDataset(Dataset):
"""
The MNIST database of handwritten digits.
Training set is 60k labeled examples, test is 10k examples.
The b/w images normalized to 20x20, preserving aspect ratio.
It's the defacto standard image training set to learn about classification in DL
"""
def __init__(self, path: Path):
"""
give a path to a dir that contains the following csv files:
https://pjreddie.com/projects/mnist-in-csv/
"""
assert path, "dataset path required"
self.path = Path(path)
assert self.path.exists(), f"could not find dataset path: {path}"
self.features, self.labels = self._load()
def __getitem__(self, idx):
return (self.features[idx], self.labels[idx])
def __len__(self):
return len(self.features)
def _load(self) -> Tuple[torch.Tensor, torch.Tensor]:
# opening the CSV file
with open(self.path, mode="r") as file:
images, labels = [], []
csvFile = csv.reader(file)
examples = config.training.examples
for line, content in enumerate(csvFile):
if line == examples:
break
labels.append(int(content[0]))
image = [int(x) for x in content[1:]]
images.append(image)
labels = torch.tensor(labels, dtype=torch.int64)
images = torch.tensor(images, dtype=torch.float32)
images = einops.rearrange(images, "n (w h) -> n w h", w=28, h=28)
images = einops.repeat(
images, "n w h -> n c (w r_w) (h r_h)", c=1, r_w=8, r_h=8
)
return (images, labels)
def main():
path = Path("storage/mnist_train.csv")
dataset = MnistDataset(path=path)
print(f"len: {len(dataset)}")
print(f"first shape: {dataset[0][0].shape}")
mean = einops.reduce(dataset[:10][0], "n w h -> w h", "mean")
print(f"mean shape: {mean.shape}")
print(f"mean image: {mean}")
if __name__ == "__main__":
main()

21
ml_pipeline/data/spark.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
from sys import stdout
import csv
# 'pip install pyspark' for these
from pyspark import SparkFiles
from pyspark.sql import SparkSession
# make a spark "session". this creates a local hadoop cluster by default (!)
spark = SparkSession.builder.getOrCreate()
# put the input file in the cluster's filesystem:
spark.sparkContext.addFile("https://csvbase.com/meripaterson/stock-exchanges.csv")
# the following is much like for pandas
df = (
spark.read.csv(f"file://{SparkFiles.get('stock-exchanges.csv')}", header=True)
.select("MIC")
.na.drop()
.sort("MIC")
)
# pyspark has no easy way to write csv to stdout - use python's csv lib
csv.writer(stdout).writerows(df.collect())

View File

@ -0,0 +1,23 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "634a9940-7cda-4fe3-bd68-cd69c7db199d",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "",
"name": ""
},
"language_info": {
"name": ""
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -0,0 +1,85 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "9f86d9e7-ca94-4dce-b86d-7ddb261f4e25",
"metadata": {},
"outputs": [],
"source": [
"# Now you can import your package\n",
"import ml_pipeline"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "6ba8b629-82db-487f-acbf-2ca20feee7e2",
"metadata": {},
"outputs": [],
"source": [
"from ml_pipeline.data.dataset import MnistDataset"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "8fb6c881-46ba-40e5-b837-c507c5bfae21",
"metadata": {},
"outputs": [],
"source": [
"from ml_pipeline import config"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "c8ce7920-c056-44ac-93df-b25bae870592",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<ConfigurationSet: 0x7fcf70fc1a50>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"config"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "83293ef7-37b3-452f-8de5-13bee633d099",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -1,43 +0,0 @@
from torch import nn
class Runner:
"""Runner class that is in charge of implementing routine training functions such as running epochs or doing inference time"""
def __init__(self, train_set, train_loader, accelerator, model, optimizer):
# Initialize class attributes
self.accelerator = accelerator
self.train_set = train_set
# Prepare opt, model, and train_loader (helps accelerator auto-cast to devices)
self.optimizer, self.model, self.train_loader = accelerator.prepare(
optimizer, model, train_loader
)
# Since data is for targets, use Mean Squared Error Loss
self.criterion = nn.MSELoss()
def step(self):
"""Runs an epoch of training.
Includes updating model weights and tracking training loss
Returns:
float: The loss averaged over the entire epoch
"""
# Turn the model to training mode (affects batchnorm and dropout)
self.model.train()
running_loss, running_sample_count = 0.0, 0.0
for sample, target in self.train_loader:
self.optimizer.zero_grad() # Reset gradients to 0
prediction = self.model(sample) # Forward pass through model
loss = self.criterion(prediction, target) # Error calculation
running_loss += loss # Increment running loss
running_sample_count += len(sample)
self.accelerator.backward(
loss
) # Increment gradients within model by sending loss backwards
self.optimizer.step() # Update model weights
yield running_loss / running_sample_count # Take the average of the loss over each sample

View File

@ -1,59 +0,0 @@
"""
main class for building a DL pipeline.
"""
from accelerate import Accelerator
from torch.utils.data import DataLoader
from torch.optim import AdamW
from ml_pipeline.data import GenericDataset
from ml_pipeline.model.linear import DNN
from ml_pipeline.runner import Runner
import click
@click.group()
def cli():
pass
@cli.command()
def train():
# Initialize hyperparameters
hidden_size = 128
epochs = 10
batch_size = 10
lr = 0.001
# Accelerator is in charge of auto casting tensors to the appropriate GPU device
accelerator = Accelerator()
# Initialize the training set and a dataloader to iterate over the dataset
train_set = GenericDataset()
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
# Get the size of the input and output vectors from the training set
in_features, out_features = train_set.get_in_out_size()
# Create the model and optimizer and cast model to the appropriate GPU
model = DNN(in_features, hidden_size, out_features).to(accelerator.device)
optimizer = AdamW(model.parameters(), lr=lr)
# Create a runner that will handle
runner = Runner(
train_set=train_set,
train_loader=train_loader,
accelerator=accelerator,
model=model,
optimizer=optimizer,
)
# Train the model
for _ in range(epochs):
# Run one loop of training and record the average loss
for step in runner.step():
print(f"{step}")
if __name__ == "__main__":
cli()

View File

@ -0,0 +1,12 @@
"""
main class for building a DL pipeline.
"""
from enum import Enum, auto
class Stage(Enum):
TRAIN = auto()
DEV = auto()
TEST = auto()

View File

@ -0,0 +1,59 @@
from torch.utils.data import DataLoader
from torch.optim import AdamW
from ml_pipeline.training.runner import Runner
from ml_pipeline import config
def run():
# Initialize the training set and a dataloader to iterate over the dataset
# train_set = GenericDataset()
train_set = get_dataset()
train_loader = DataLoader(train_set, batch_size=config.training.batch_size, shuffle=True)
model = get_model(name=config.model.name)
# Get the size of the input and output vectors from the training set
# in_features, out_features = train_set.get_in_out_size()
optimizer = AdamW(model.parameters(), lr=config.training.learning_rate)
# Create a runner that will handle
runner = Runner(
train_set=train_set,
train_loader=train_loader,
model=model,
optimizer=optimizer,
)
# Train the model
for _ in range(config.training.epochs):
# Run one loop of training and record the average loss
for step in runner.step():
print(f"{step}")
def get_model(name='vgg11'):
from ml_pipeline.model.linear import DNN
from ml_pipeline.model.cnn import VGG11
if name == 'vgg11':
return VGG11(config.data.in_channels, config.data.num_classes)
else:
# Create the model and optimizer and cast model to the appropriate GPU
in_features, out_features = dataset.in_out_features()
model = DNN(in_features, config.model.hidden_size, out_features)
return model.to(config.training.device)
def get_dataset(source='mnist', split='train'):
# Usage
from ml_pipeline.data.dataset import MnistDataset
from torchvision import transforms
csv_file_path = config.data.train_path
transform = transforms.Compose([
transforms.ToTensor(), # Converts a PIL Image or numpy.ndarray to a FloatTensor and scales the image's pixel intensity values to the [0., 1.] range
transforms.Normalize((0.1307,), (0.3081,)) # Normalize using the mean and std specific to MNIST
])
dataset = MnistDataset(csv_file_path)
return dataset

View File

@ -0,0 +1,46 @@
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import Optimizer
class Runner:
"""Runner class that is in charge of implementing routine training functions such as running epochs or doing inference time"""
def __init__(self, train_set: Dataset, train_loader: DataLoader, model: nn.Module, optimizer: Optimizer):
# Initialize class attributes
self.train_set = train_set
# Prepare opt, model, and train_loader (helps accelerator auto-cast to devices)
self.optimizer, self.model, self.train_loader = (
optimizer, model, train_loader
)
# Since data is for targets, use Mean Squared Error Loss
# self.criterion = nn.MSELoss()
self.criterion = nn.CrossEntropyLoss()
def step(self):
"""Runs an epoch of training.
Includes updating model weights and tracking training loss
Returns:
float: The loss averaged over the entire epoch
"""
# turn the model to training mode (affects batchnorm and dropout)
self.model.train()
total_loss, total_samples = 0.0, 0.0
for sample, target in self.train_loader:
self.optimizer.zero_grad() # reset gradients to 0
prediction = self.model(sample) # forward pass through model
loss = self.criterion(prediction, target) # error calculation
# increment gradients within model by sending loss backwards
loss.backward()
self.optimizer.step() # update model weights
total_loss += loss # increment running loss
total_samples += len(sample)
yield total_loss / total_samples # take the average of the loss over each sample

View File

@ -17,11 +17,16 @@ dependencies = [
"matplotlib==3.8.4", "matplotlib==3.8.4",
"numpy==1.26.4", "numpy==1.26.4",
"pytest==8.1.1", "pytest==8.1.1",
"pytest-cov==5.0.0",
"python-dotenv==1.0.1", "python-dotenv==1.0.1",
"requests==2.31.0", "requests==2.31.0",
"torch==2.2.2", "torch==2.2.2",
"torchvision=0.17.2",
"tqdm==4.66.2", "tqdm==4.66.2",
"wandb==0.16.6", "wandb==0.16.6",
"python-configuration[toml]",
"pandas==2.2.1",
"notebook==7.1.2",
] ]
[project.urls] [project.urls]
@ -31,3 +36,33 @@ documentation = "https://example.com/my_project/docs"
[tool.setuptools] [tool.setuptools]
packages = ["ml_pipeline"] packages = ["ml_pipeline"]
[tool.pytest.ini_options]
# Run tests in parallel using pytest-xdist
addopts = "--cov=ml_pipeline --cov-report=term"
# Specify the paths to look for tests
testpaths = [
"test",
]
# Set default Python classes, functions, and methods to consider as tests
python_files = [
"test_*.py",
"test*.py",
"*_test.py",
]
python_classes = [
"Test*",
"*Test",
"*Tests",
"*TestCase",
]
python_functions = [
"test_*",
"*_test",
]
# Configure markers (custom or otherwise)
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"online: marks tests that require internet access",
]

View File

@ -1,20 +0,0 @@
# conftest.py
import pytest
import os
from dotenv import load_dotenv
from pathlib import Path
@pytest.fixture(autouse=True)
def load_env():
# Set up your environment variables here
env = Path(__file__).parent / ".env.test"
if not load_dotenv(env):
raise RuntimeError(".env not loaded")
# os.environ['MY_ENV_VAR'] = 'some_value'
# You can add more setup code here if needed
yield
# Optional: Cleanup code after test (if needed)
# e.g., unset environment variables if they should not persist after test

6
test/test_cnn.py Normal file
View File

@ -0,0 +1,6 @@
from ml_pipeline import config
from ml_pipeline.model.cnn import VGG11
def test_in_channels():
assert config.model.name == 'vgg11'

28
test/test_inputs.py Normal file
View File

@ -0,0 +1,28 @@
from ml_pipeline.data.dataset import MnistDataset
from ml_pipeline import config
from pathlib import Path
import pytest
@pytest.mark.skip()
def test_init():
pass
def test_getitem():
train_set = MnistDataset(config.data.train_path)
assert train_set[0][1].item() == 5
repeated = 8
length = 28
channels = 1
assert train_set[0][0].shape == (channels, length * repeated, length * repeated)
@pytest.mark.skip()
def test_loader():
from torch.utils.data import DataLoader
train_set = MnistDataset(config.data.train_path)
# train_loader = DataLoader(train_set, batch_size=config.training.batch_size, shuffle=True)
# for sample, target in train_loader:
# assert len(sample) == config.training.batch_size
# len(sample)
# len(target)

View File

@ -1,17 +0,0 @@
from src.model.linear import DNN
from src.data.dataset import MnistDataset
import os
def test_size_of_dataset():
examples = 500
os.environ["TRAINING_EXAMPLES"] = str(examples)
channels = 1
width, height = 224, 224
dataset = MnistDataset(os.getenv("TRAIN_PATH"))
# label = dataset[0][1].item()
image = dataset[0][0].shape
assert channels == image[0]
assert width == image[1]
assert height == image[2]
assert len(dataset) == examples