PyTorch

Install

https://pytorch.org/get-started/locally/#start-locally

mamba install pytorch torchvision torchinfo torchaudio torchtext cudatoolkit -c pytorch -c nvidia

Check GPU

import torch


torch.cuda.is_available()

Import

import torch

import torch.nn as nn

import torch.nn.functional as F

import torch.optim as optim

from torch.autograd import Variable

import torch.utils.data as td

from torchinfo import summary


import torchvision

import torchvision.transforms as transforms


# Set random seed for reproducibility

torch.manual_seed(0)

Math

torch_arr = torch.as_tensor(numba_arr, device="cuda")

Multi-Classification tabular

Data preperation

Put numpy array into PyTorch object

train_x = torch.Tensor(x_train).float()

train_y = torch.Tensor(y_train).long()

train_ds = td.TensorDataset(train_x, train_y)

train_loader = td.DataLoader(train_ds, batch_size=20, shuffle=False, num_workers=1)

Setup model

3 FC layers. 10 nodes. ReLU. SoftMax. Cross Entropy Loss. ADAM.

class ClassificationNet(nn.Module):

    def __init__(self):

        super(ClassificationNet, self).__init__()

        self.fc1 = nn.Linear(len(features), 10)

        self.fc2 = nn.Linear(10, 10)

        self.fc3 = nn.Linear(10, len(classes))


    def forward(self, x):

        x = torch.relu(self.fc1(x))

        x = torch.relu(self.fc2(x))

        x = torch.softmax(self.fc3(x),dim=1)

        return x


model = ClassificationNet()

print(model)


def train(model, data_loader, optimizer):

    # Set the model to training mode

    model.train()

    train_loss = 0

    

    for batch, tensor in enumerate(data_loader):

        data, target = tensor

        #feedforward

        optimizer.zero_grad()

        out = model(data)

        loss = loss_criteria(out, target)

        train_loss += loss.item()


        # backpropagate

        loss.backward()

        optimizer.step()


    #Return average loss

    avg_loss = train_loss / (batch+1)

    print('Training set: Average loss: {:.6f}'.format(avg_loss))

    return avg_loss

           

            

def test(model, data_loader):

    # Switch the model to evaluation mode (so we don't backpropagate)

    model.eval()

    test_loss = 0

    correct = 0


    with torch.no_grad():

        batch_count = 0

        for batch, tensor in enumerate(data_loader):

            batch_count += 1

            data, target = tensor

            # Get the predictions

            out = model(data)


            # calculate the loss

            test_loss += loss_criteria(out, target).item()


            # Calculate the accuracy

            _, predicted = torch.max(out.data, 1)

            correct += torch.sum(target==predicted).item()

            

    # Calculate the average loss and total accuracy for this epoch

    avg_loss = test_loss/batch_count

    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(

        avg_loss, correct, len(data_loader.dataset),

        100. * correct / len(data_loader.dataset)))

    

    # return average loss for the epoch

    return avg_loss


# Specify the loss criteria (CrossEntropyLoss for multi-class classification)

loss_criteria = nn.CrossEntropyLoss()


# Use an "Adam" optimizer to adjust weights

# (see https://pytorch.org/docs/stable/optim.html#algorithms for details of supported algorithms)

learning_rate = 0.001

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

optimizer.zero_grad()


# We'll track metrics for each epoch in these arrays

epoch_nums = []

training_loss = []

validation_loss = []

Run

epochs = 50

for epoch in range(1, epochs + 1):


    # print the epoch number

    print('Epoch: {}'.format(epoch))

    

    # Feed training data into the model to optimize the weights

    train_loss = train(model, train_loader, optimizer)

    

    # Feed the test data into the model to check its performance

    test_loss = test(model, test_loader)

    

    # Log the metrics for this epoch

    epoch_nums.append(epoch)

    training_loss.append(train_loss)

    validation_loss.append(test_loss)

Plot

plt.plot(epoch_nums, training_loss)

plt.plot(epoch_nums, validation_loss)

plt.xlabel('epoch')

plt.ylabel('loss')

plt.legend(['training', 'validation'], loc='upper right')

plt.show()

See confusion matrix

# Set the model to evaluate mode

model.eval()


# Get predictions for the test data

x = torch.Tensor(x_test).float()

_, predicted = torch.max(model(x).data, 1)


# Plot the confusion matrix

cm = confusion_matrix(y_test, predicted.numpy())

plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)

plt.colorbar()

tick_marks = np.arange(len(classes))

plt.xticks(tick_marks, classes, rotation=45)

plt.yticks(tick_marks, classes)

plt.xlabel("Actual")

plt.ylabel("Predicted")

plt.show()

Save model

model_file = 'models/classifier.pt'

torch.save(model.state_dict(), model_file)

del model

Predict

# Create a new model class and load weights

model = ClassificationNet()

model.load_state_dict(torch.load(model_file))


# Set model to evaluation mode

model.eval()


# Get a prediction for the new data sample

x = torch.Tensor(x_new).float()

_, predicted = torch.max(model(x).data, 1)


print('Prediction:', classes[predicted.item()])

Transfer learning

Get model

model = torchvision.models.resnet34(pretrained=True)

Update last model layer with number of features

# Set the existing feature extraction layers to read-only

for param in model.parameters():

    param.requires_grad = False


# Replace the prediction layer

num_ftrs = model.fc.in_features

model.fc = nn.Linear(num_ftrs, len(classes))


# Now print the full model, which will include the feature extraction layers of the base model and our prediction layer

print(model)

CNN

Images of the different classes are stored in different folder with the class name as the name of the file. Then files are classn.jpg

Load data

# Function to ingest data using training and test loaders

def load_dataset(data_path):

    # Load all of the images

    transformation = transforms.Compose([

        # transform to tensors

        transforms.ToTensor(),

        # Normalize the pixel values (in R, G, and B channels)

        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])

    ])


    # Load all of the images, transforming them

    full_dataset = torchvision.datasets.ImageFolder(

        root=data_path,

        transform=transformation

    )

    

    

    # Split into training (70% and testing (30%) datasets)

    train_size = int(0.7 * len(full_dataset))

    test_size = len(full_dataset) - train_size

    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    

    # define a loader for the training data we can iterate through in 50-image batches

    train_loader = torch.utils.data.DataLoader(

        train_dataset,

        batch_size=50,

        num_workers=0,

        shuffle=False

    )

    

    # define a loader for the testing data we can iterate through in 50-image batches

    test_loader = torch.utils.data.DataLoader(

        test_dataset,

        batch_size=50,

        num_workers=0,

        shuffle=False

    )

        

    return train_loader, test_loader



# Now load the images from the shapes folder

data_path = 'path/'


# Get the class names

classes = os.listdir(data_path)

classes.sort()

print(len(classes), 'classes:')

print(classes)


# Get the iterative dataloaders for test and training data

train_loader, test_loader = load_dataset(data_path)

Setup model

class CNN(nn.Module):

    # Constructor

    def __init__(self, num_classes=3):

        super(Net, self).__init__()

        

        # Our images are RGB, so input channels = 3. We'll apply 12 filters in the first convolutional layer

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)

        

        # We'll apply max pooling with a kernel size of 2

        self.pool = nn.MaxPool2d(kernel_size=2)

        

        # A second convolutional layer takes 12 input channels, and generates 12 outputs

        self.conv2 = nn.Conv2d(in_channels=12, out_channels=12, kernel_size=3, stride=1, padding=1)

        

        # A third convolutional layer takes 12 inputs and generates 24 outputs

        self.conv3 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)

        

        # A drop layer deletes 20% of the features to help prevent overfitting

        self.drop = nn.Dropout2d(p=0.2)

        

        # Our 128x128 image tensors will be pooled twice with a kernel size of 2. 128/2/2 is 32.

        # So our feature tensors are now 32 x 32, and we've generated 24 of them

        # We need to flatten these and feed them to a fully-connected layer

        # to map them to  the probability for each class

        self.fc = nn.Linear(in_features=32 * 32 * 24, out_features=num_classes)


    def forward(self, x):

        # Use a relu activation function after layer 1 (convolution 1 and pool)

        x = F.relu(self.pool(self.conv1(x)))

      

        # Use a relu activation function after layer 2 (convolution 2 and pool)

        x = F.relu(self.pool(self.conv2(x)))

        

        # Select some features to drop after the 3rd convolution to prevent overfitting

        x = F.relu(self.drop(self.conv3(x)))

        

        # Only drop the features if this is a training pass

        x = F.dropout(x, training=self.training)

        

        # Flatten

        x = x.view(-1, 32 * 32 * 24)

        # Feed to fully-connected layer to predict class

        x = self.fc(x)

        # Return class probabilities via a log_softmax function 

        return F.log_softmax(x, dim=1)

Train the model

def train(model, device, train_loader, optimizer, epoch):

    # Set the model to training mode

    model.train()

    train_loss = 0

    print("Epoch:", epoch)

    # Process the images in batches

    for batch_idx, (data, target) in enumerate(train_loader):

        # Use the CPU or GPU as appropriate

        data, target = data.to(device), target.to(device)

        

        # Reset the optimizer

        optimizer.zero_grad()

        

        # Push the data forward through the model layers

        output = model(data)

        

        # Get the loss

        loss = loss_criteria(output, target)

        

        # Keep a running total

        train_loss += loss.item()

        

        # Backpropagate

        loss.backward()

        optimizer.step()

        

        # Print metrics for every 10 batches so we see some progress

        if batch_idx % 10 == 0:

            print('Training set [{}/{} ({:.0f}%)] Loss: {:.6f}'.format(

                batch_idx * len(data), len(train_loader.dataset),

                100. * batch_idx / len(train_loader), loss.item()))

            

    # return average loss for the epoch

    avg_loss = train_loss / (batch_idx+1)

    print('Training set: Average loss: {:.6f}'.format(avg_loss))

    return avg_loss

            

            

def test(model, device, test_loader):

    # Switch the model to evaluation mode (so we don't backpropagate or drop)

    model.eval()

    test_loss = 0

    correct = 0

    with torch.no_grad():

        batch_count = 0

        for data, target in test_loader:

            batch_count += 1

            data, target = data.to(device), target.to(device)

            

            # Get the predicted classes for this batch

            output = model(data)

            

            # Calculate the loss for this batch

            test_loss += loss_criteria(output, target).item()

            

            # Calculate the accuracy for this batch

            _, predicted = torch.max(output.data, 1)

            correct += torch.sum(target==predicted).item()


    # Calculate the average loss and total accuracy for this epoch

    avg_loss = test_loss/batch_count

    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(

        avg_loss, correct, len(test_loader.dataset),

        100. * correct / len(test_loader.dataset)))

    

    # return average loss for the epoch

    return avg_loss

    

    

# Now use the train and test functions to train and test the model    


device = "cpu"

if (torch.cuda.is_available()):

    # if GPU available, use cuda (on a cpu, training will take a considerable length of time!)

    device = "cuda"

print('Training on', device)


# Create an instance of the model class and allocate it to the device

model = Net(num_classes=len(classes)).to(device)


# Use an "Adam" optimizer to adjust weights

# (see https://pytorch.org/docs/stable/optim.html#algorithms for details of supported algorithms)

optimizer = optim.Adam(model.parameters(), lr=0.001)


# Specify the loss criteria

loss_criteria = nn.CrossEntropyLoss()


# Track metrics in these arrays

epoch_nums = []

training_loss = []

validation_loss = []


# Train over 5 epochs (in a real scenario, you'd likely use many more)

epochs = 5

for epoch in range(1, epochs + 1):

        train_loss = train(model, device, train_loader, optimizer, epoch)

        test_loss = test(model, device, test_loader)

        epoch_nums.append(epoch)

        training_loss.append(train_loss)

        validation_loss.append(test_loss)

Predict

def predict_image(classifier, image):

    import numpy

    

    # Set the classifer model to evaluation mode

    classifier.eval()

    

    # Apply the same transformations as we did for the training images

    transformation = transforms.Compose([

        transforms.ToTensor(),

        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])

    ])


    # Preprocess the image

    image_tensor = transformation(image).float()


    # Add an extra batch dimension since pytorch treats all inputs as batches

    image_tensor = image_tensor.unsqueeze_(0)


    # Turn the input into a Variable

    input_features = Variable(image_tensor)


    # Predict the class of the image

    output = classifier(input_features)

    index = output.data.numpy().argmax()

    return index



# Function to create a random image (of a square, circle, or triangle)

def create_image (size, shape):

    from random import randint

    import numpy as np

    from PIL import Image, ImageDraw

    

    xy1 = randint(10,40)

    xy2 = randint(60,100)

    col = (randint(0,200), randint(0,200), randint(0,200))


    img = Image.new("RGB", size, (255, 255, 255))

    draw = ImageDraw.Draw(img)

    

    if shape == 'circle':

        draw.ellipse([(xy1,xy1), (xy2,xy2)], fill=col)

    elif shape == 'triangle':

        draw.polygon([(xy1,xy1), (xy2,xy2), (xy2,xy1)], fill=col)

    else: # square

        draw.rectangle([(xy1,xy1), (xy2,xy2)], fill=col)

    del draw

    

    return np.array(img)


# Save the model weights

model_file = 'models/shape_classifier.pt'

torch.save(model.state_dict(), model_file)

del model


# Create a random test image

classnames = os.listdir(os.path.join('data', 'shapes'))

classnames.sort()

shape = classnames[randint(0, len(classnames)-1)]

img = create_image ((128,128), shape)


# Display the image

plt.axis('off')

plt.imshow(img)


# Create a new model class and load weights

model = Net()

model.load_state_dict(torch.load(model_file))


# Call the predction function

index = predict_image(model, img)

print(classes[index])

Vision

Reshape

import torchvision.transforms as transforms

from PIL import Image


img = Image.open("1.png")

w, h = img.size[0], img.size[1]


factor = 4

transform = transforms.Resize((h // factor, w // factor), interpolation=3) # bicubic

lr_img = transform(img)

Utils

import torch

model = torch.load("model.pth",  map_location=torch.device("cpu"))