This commit is contained in:
publicmatt
2024-04-06 13:02:31 -07:00
commit 24a0c6196f
40 changed files with 1716 additions and 0 deletions

View File

@@ -0,0 +1,21 @@
from config import ConfigurationSet, config_from_env, config_from_dotenv, config_from_toml
from pathlib import Path
pwd = Path(__file__).parent
config_path = pwd / 'config'
root_path = pwd.parent
config = ConfigurationSet(
config_from_env(prefix="ML_PIPELINE", separator="__", lowercase_keys=True),
config_from_dotenv(root_path / ".env", read_from_file=True, lowercase_keys=True, interpolate=True, interpolate_type=1),
config_from_toml(config_path / "training.toml", read_from_file=True),
config_from_toml(config_path / "data.toml", read_from_file=True),
config_from_toml(config_path / "model.toml", read_from_file=True),
config_from_toml(config_path / "app.toml", read_from_file=True),
config_from_toml(config_path / "paths.toml", read_from_file=True),
)
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,5 @@
from ml_pipeline.cli import cli
if __name__ == "__main__":
cli()

View File

@@ -0,0 +1,11 @@
from ml_pipeline import config
from fastapi import FastAPI, Response
import logging
import uvicorn
app = FastAPI()
logger = logging.getLogger(__name__)
def run():
uvicorn.run("ml_pipeline.app:app", host=config.app.host, port=config.app.port, proxy_headers=True)

View File

@@ -0,0 +1,70 @@
import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
from ml_pipeline.data import FashionDataset
from tqdm import tqdm
from ml_pipeline.common import Stage
class Batch:
def __init__(
self,
stage: Stage,
model: nn.Module,
device,
loader: DataLoader,
optimizer: optim.Optimizer,
criterion: nn.Module,
):
"""todo"""
self.stage = stage
self.device = device
self.model = model.to(device)
self.loader = loader
self.criterion = criterion
self.optimizer = optimizer
self.loss = 0
def run(self, desc):
self.model.train()
epoch = 0
for epoch, (x, y) in enumerate(tqdm(self.loader, desc=desc)):
self.optimizer.zero_grad()
loss = self._run_batch((x, y))
loss.backward() # Send loss backwards to accumulate gradients
self.optimizer.step() # Perform a gradient update on the weights of the mode
self.loss += loss.item()
def _run_batch(self, sample):
true_x, true_y = sample
true_x, true_y = true_x.to(self.device), true_y.to(self.device)
pred_y = self.model(true_x)
loss = self.criterion(pred_y, true_y)
return loss
def main():
model = nn.Conv2d(1, 64, 3)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)
path = "fashion-mnist_train.csv"
dataset = FashionDataset(path)
batch_size = 16
num_workers = 1
loader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers
)
batch = Batch(
Stage.TRAIN,
device=torch.device("cpu"),
model=model,
criterion=criterion,
optimizer=optimizer,
loader=loader,
)
batch.run("test")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
import click
@click.group()
@click.version_option()
def cli():
"""
ml_pipeline: a template for building, training and running pytorch models.
"""
@cli.command("pipeline:train")
def pipeline_train():
"""run the training pipeline with train data"""
from ml_pipeline.training import pipeline
pipeline.run(evaluate=False)
@cli.command("pipeline:evaluate")
def pipeline_evaluate():
"""run the training pipeline with test data"""
from ml_pipeline.training import pipeline
pipeline.run(evaluate=True)
@cli.command("app:serve")
def app_serve():
"""run the api server pipeline with pretrained model"""
from ml_pipeline import app
app.run()
@cli.command("data:download")
def data_download():
"""download the train and test data"""
from ml_pipeline import data
from ml_pipeline import config
from pathlib import Path
data.download(Path(config.paths.data))
@cli.command("data:debug")
def data_debug():
"""debug the dataset class"""
from ml_pipeline.data import dataset
dataset.debug()

View File

@@ -0,0 +1,3 @@
[app]
host = "127.0.0.1"
port = 8001

View File

@@ -0,0 +1,4 @@
[data]
train_path = "/path/to/data/mnist_train.csv"
in_channels = 1
num_classes = 10

View File

@@ -0,0 +1,3 @@
[model]
hidden_size = 8
name = 'vgg11'

View File

@@ -0,0 +1,4 @@
[paths]
repo = "/path/to/root"
app = "/path/to/root/ml_pipeline"
data = "/path/to/root/data"

View File

@@ -0,0 +1,8 @@
[training]
batch_size = 16
epochs = 10
learning_rate = 0.01
device = 'cpu'
# examples = 50
examples = -1

View File

@@ -0,0 +1,27 @@
from pathlib import Path
import requests
import logging
from ml_pipeline import config
logger = logging.getLogger(__name__)
def download(data_path: Path, force=False):
urls = {
'train' : 'https://pjreddie.com/media/files/mnist_train.csv',
'test' : 'https://pjreddie.com/media/files/mnist_test.csv'
}
for dataset, url in urls.items():
filename = data_path / url.split('/')[-1]
if filename.exists() and not force:
logger.info(f'file exists {filename} (set force to overwrite)')
continue
logger.info(f'downloading {dataset} {url}')
response = requests.get(url)
if response.status_code == 200:
with open(filename, 'wb') as file:
file.write(response.content)
logger.info(f'file downloaded {filename}')
else:
logger.info(f'failed to download file {filename}')

View File

@@ -0,0 +1,66 @@
from torch.utils.data import Dataset
import numpy as np
import einops
import csv
import torch
from pathlib import Path
from typing import Tuple
from ml_pipeline import config, logger
class MnistDataset(Dataset):
"""
The MNIST database of handwritten digits.
Training set is 60k labeled examples, test is 10k examples.
The b/w images normalized to 20x20, preserving aspect ratio.
It's the defacto standard image training set to learn about classification in DL
"""
def __init__(self, path: Path):
"""
give a path to a dir that contains the following csv files:
https://pjreddie.com/projects/mnist-in-csv/
"""
assert path, "dataset path required"
self.path = Path(path)
assert self.path.exists(), f"could not find dataset path: {path}"
self.features, self.labels = self._load()
def __getitem__(self, idx):
return (self.features[idx], self.labels[idx])
def __len__(self):
return len(self.features)
def _load(self) -> Tuple[torch.Tensor, torch.Tensor]:
# opening the CSV file
with open(self.path, mode="r") as file:
images, labels = [], []
csvFile = csv.reader(file)
examples = config.training.examples
for line, content in enumerate(csvFile):
if line == examples:
break
labels.append(int(content[0]))
image = [int(x) for x in content[1:]]
images.append(image)
labels = torch.tensor(labels, dtype=torch.int64)
images = torch.tensor(images, dtype=torch.float32)
images = einops.rearrange(images, "n (w h) -> n w h", w=28, h=28)
images = einops.repeat(
images, "n w h -> n c (w r_w) (h r_h)", c=1, r_w=8, r_h=8
)
return (images, labels)
def debug():
path = Path(config.paths.data) / "mnist_train.csv"
dataset = MnistDataset(path=path)
logger.info(f"len: {len(dataset)}")
logger.info(f"first shape: {dataset[0][0].shape}")
mean = einops.reduce(dataset[:10][0], "n w h -> w h", "mean")
logger.info(f"mean shape: {mean.shape}")
logger.info(f"mean image: {mean}")

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
from sys import stdout
import csv
# 'pip install pyspark' for these
from pyspark import SparkFiles
from pyspark.sql import SparkSession
# make a spark "session". this creates a local hadoop cluster by default (!)
spark = SparkSession.builder.getOrCreate()
# put the input file in the cluster's filesystem:
spark.sparkContext.addFile("https://csvbase.com/meripaterson/stock-exchanges.csv")
# the following is much like for pandas
df = (
spark.read.csv(f"file://{SparkFiles.get('stock-exchanges.csv')}", header=True)
.select("MIC")
.na.drop()
.sort("MIC")
)
# pyspark has no easy way to write csv to stdout - use python's csv lib
csv.writer(stdout).writerows(df.collect())

View File

@@ -0,0 +1,152 @@
from torch import nn
# the VGG11 architecture
class VGG11(nn.Module):
def __init__(self, in_channels, num_classes=1000):
super(VGG11, self).__init__()
self.in_channels = in_channels
self.num_classes = num_classes
# convolutional layers
self.conv_layers = nn.Sequential(
nn.Conv2d(self.in_channels, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(256, 512, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
# fully connected linear layers
self.linear_layers = nn.Sequential(
nn.Linear(in_features=512 * 7 * 7, out_features=4096),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(in_features=4096, out_features=4096),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(in_features=4096, out_features=self.num_classes),
)
def forward(self, x):
x = self.conv_layers(x)
# flatten to prepare for the fully connected layers
x = x.view(x.size(0), -1)
x = self.linear_layers(x)
return x
class VGG16(nn.Module):
def __init__(self, num_classes=10):
super(VGG16, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
)
self.layer2 = nn.Sequential(
nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.layer3 = nn.Sequential(
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
)
self.layer4 = nn.Sequential(
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.layer5 = nn.Sequential(
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(),
)
self.layer6 = nn.Sequential(
nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(),
)
self.layer7 = nn.Sequential(
nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.layer8 = nn.Sequential(
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
)
self.layer9 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
)
self.layer10 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.layer11 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
)
self.layer12 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
)
self.layer13 = nn.Sequential(
nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.fc = nn.Sequential(
nn.Dropout(0.5), nn.Linear(7 * 7 * 512, 4096), nn.ReLU()
)
self.fc1 = nn.Sequential(nn.Dropout(0.5), nn.Linear(4096, 4096), nn.ReLU())
self.fc2 = nn.Sequential(nn.Linear(4096, num_classes))
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = self.layer5(out)
out = self.layer6(out)
out = self.layer7(out)
out = self.layer8(out)
out = self.layer9(out)
out = self.layer10(out)
out = self.layer11(out)
out = self.layer12(out)
out = self.layer13(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
out = self.fc1(out)
out = self.fc2(out)
return out

View File

@@ -0,0 +1,19 @@
from torch import nn
class DNN(nn.Module):
def __init__(self, in_size, hidden_size, out_size):
super().__init__()
# Define the activation function and the linear functions
self.act = nn.ReLU()
self.in_linear = nn.Linear(in_size, hidden_size)
self.out_linear = nn.Linear(hidden_size, out_size)
def forward(self, x):
# Send x through first linear layer and activation function
x = self.act(self.in_linear(x))
# Return x through the out linear function
return self.out_linear(x)

View File

@@ -0,0 +1,23 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "634a9940-7cda-4fe3-bd68-cd69c7db199d",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "",
"name": ""
},
"language_info": {
"name": ""
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,85 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "9f86d9e7-ca94-4dce-b86d-7ddb261f4e25",
"metadata": {},
"outputs": [],
"source": [
"# Now you can import your package\n",
"import ml_pipeline"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "6ba8b629-82db-487f-acbf-2ca20feee7e2",
"metadata": {},
"outputs": [],
"source": [
"from ml_pipeline.data.dataset import MnistDataset"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "8fb6c881-46ba-40e5-b837-c507c5bfae21",
"metadata": {},
"outputs": [],
"source": [
"from ml_pipeline import config"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "c8ce7920-c056-44ac-93df-b25bae870592",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<ConfigurationSet: 0x7fcf70fc1a50>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"config"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "83293ef7-37b3-452f-8de5-13bee633d099",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,12 @@
"""
main class for building a DL pipeline.
"""
from enum import Enum, auto
class Stage(Enum):
TRAIN = auto()
DEV = auto()
TEST = auto()

View File

@@ -0,0 +1,55 @@
from torch.utils.data import DataLoader
from torch.optim import AdamW
from ml_pipeline.training.runner import Runner
from ml_pipeline import config, logger
def run(evaluate=False):
# Initialize the training set and a dataloader to iterate over the dataset
# train_set = GenericDataset()
dataset = get_dataset(evaluate)
dataloader = DataLoader(dataset, batch_size=config.training.batch_size, shuffle=True)
model = get_model(name=config.model.name)
optimizer = AdamW(model.parameters(), lr=config.training.learning_rate)
# Create a runner that will handle
runner = Runner(
dataset=dataset,
dataloader=dataloader,
model=model,
optimizer=optimizer,
)
# Train the model
for _ in range(config.training.epochs):
# Run one loop of training and record the average loss
for step in runner.step():
logger.info(f"{step}")
def get_model(name='vgg11'):
from ml_pipeline.model.linear import DNN
from ml_pipeline.model.cnn import VGG11
if name == 'vgg11':
return VGG11(config.data.in_channels, config.data.num_classes)
else:
# Create the model and optimizer and cast model to the appropriate GPU
in_features, out_features = dataset.in_out_features()
model = DNN(in_features, config.model.hidden_size, out_features)
return model.to(config.training.device)
def get_dataset(evaluate=False):
# Usage
from ml_pipeline.data.dataset import MnistDataset
from torchvision import transforms
csv_file_path = config.data.train_path if not evaluate else config.data.test_path
transform = transforms.Compose([
transforms.ToTensor(), # Converts a PIL Image or numpy.ndarray to a FloatTensor and scales the image's pixel intensity values to the [0., 1.] range
transforms.Normalize((0.1307,), (0.3081,)) # Normalize using the mean and std specific to MNIST
])
dataset = MnistDataset(csv_file_path)
return dataset

View File

@@ -0,0 +1,46 @@
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import Optimizer
class Runner:
"""Runner class that is in charge of implementing routine training functions such as running epochs or doing inference time"""
def __init__(self, dataset: Dataset, dataloader: DataLoader, model: nn.Module, optimizer: Optimizer):
# Initialize class attributes
self.dataset = dataset
# Prepare opt, model, and dataloader (helps accelerator auto-cast to devices)
self.optimizer, self.model, self.dataloader = (
optimizer, model, dataloader
)
# Since data is for targets, use Mean Squared Error Loss
# self.criterion = nn.MSELoss()
self.criterion = nn.CrossEntropyLoss()
def step(self):
"""Runs an epoch of training.
Includes updating model weights and tracking training loss
Returns:
float: The loss averaged over the entire epoch
"""
# turn the model to training mode (affects batchnorm and dropout)
self.model.train()
total_loss, total_samples = 0.0, 0.0
for sample, target in self.dataloader:
self.optimizer.zero_grad() # reset gradients to 0
prediction = self.model(sample) # forward pass through model
loss = self.criterion(prediction, target) # error calculation
# increment gradients within model by sending loss backwards
loss.backward()
self.optimizer.step() # update model weights
total_loss += loss # increment running loss
total_samples += len(sample)
yield total_loss / total_samples # take the average of the loss over each sample