hello pytorch

First PyTorch script, hooray!

The following pytorch script trains a convolutional neural network model for the emnist dataset.

The balanced dataset with 47 classes is used here. Input images are 28 x 28 in size and are single channel.

import torch

import torch.nn as nn

import torch.nn.functional as F

import torch.optim as optim

import random

import numpy as np

import gzip

from datetime import datetime

class Net(nn.Module):

    def __init__(self):

        super(Net, self).__init__()

        

        # first convolutional layer:

        #    1 input image channel, 10 output channels, 5x5 square convolution

        #    the 10 output channels are the input channels for the next layer

        #    so 10 kernels and 10 biases

        #    input image is 28 x 28 pixels, 5x5 conv outputs 24 x 24, 2x2 maxpool outputs 12 x 12

        # second convolutional layer:

        #    10 input channels (12 x 12 images), 20 output channels, 3x3 convolution

        #    so 20 kernels and 20 biases. Each kernel is applied on all the 10 inputs and results are summed up.

        #    input image is 12 x 12 pixels, 3x3 conv outputs 10 x 10, 2x2 maxpool outputs 5 x 5

        self.conv1 = nn.Conv2d(1, 10, 5)

        self.conv2 = nn.Conv2d(10, 20, 3)

        # first fully connected layer

        #    the previous convolutional layer's output channels fully connect to this hidden layer

        #    5 x 5 images from the second maxpool

        self.fc1 = nn.Linear(20 * 5 * 5, 200) 

        

        # second fully connected layer

        self.fc2 = nn.Linear(200, 100)

        

        # output layer

        self.fc3 = nn.Linear(100, 47)

        

    def forward(self, x):

        #the shape of x is (N, C, H, W)

        #where N is the batch size, there can be multiple training samples in x

        # C is the number of channels, there can be multiple channels per sample

        # H and W are the height and width of an image, e.g. 28 x 28

        # e.g. x = torch.tensor([[t] for t in test_data[:10]], dtype=torch.float32) gets the first 10 training samples

        # e.g. x = torch.tensor([[c1, c2] for c1,c2 in channel1[:10], channel2[:10]]) gets the first 10 training samples with 2 channels

        

        #first conv layer

        x = self.conv1(x) #first conv

        x = F.relu(x)     #activation.

        x = F.max_pool2d(x, (2, 2)) # 2x2 maxpool

        

        #second conv layer

        x = self.conv2(x) #first conv

        x = F.relu(x)     #

        x = F.max_pool2d(x, (2, 2)) # 2x2 maxpool

        

        #first fully connected layer

        num_features = x.shape[1] * x.shape[2] * x.shape[3]  #ie. C * H * W

        x = x.view(-1, num_features) #flatten x into (N, C * H * W)

        x = self.fc1(x)   #fist fc layer

        x = F.relu(x) 

        

        #second fully connected layer

        x = self.fc2(x)

        x = F.relu(x)

        

        #thrid layer, output

        x = self.fc3(x)

        

        return x

    def num_flat_features(self, x):

        size = x.size()[1:]  # all dimensions except the batch dimension

        num_features = 1

        for s in size:

            num_features *= s

        return num_features

print('========== start running main method ==========')

#load data from zip file

image_size = 28

f_tr = gzip.open("C:/Temp/emnist-balanced/emnist-balanced-train-images-idx3-ubyte.gz", 'rb')

f_tr.read(16)  #skip 16 bytes of non image info

data_train = f_tr.read()

f_tr.close()

f_tr_label = gzip.open("C:/Temp/emnist-balanced/emnist-balanced-train-labels-idx1-ubyte.gz", 'rb')

f_tr_label.read(8)

label_train = f_tr_label.read()

f_tr_label.close()

f_test = gzip.open("C:/Temp/emnist-balanced/emnist-balanced-test-images-idx3-ubyte.gz", 'rb')

f_test.read(16)  #skip 16 bytes of non image info

data_test = f_test.read()

f_test.close()

f_te_label = gzip.open("C:/Temp/emnist-balanced/emnist-balanced-test-labels-idx1-ubyte.gz", 'rb')

f_te_label.read(8)

label_test = f_te_label.read()

f_te_label.close()

image_pixels = 28 * 28

assert len(data_train) % image_pixels ==0

assert len(data_test) % image_pixels ==0

train_num_images = int(len(data_train) / image_pixels)

test_num_images = int(len(data_test) / image_pixels)

assert len(label_train) == train_num_images

assert len(label_test) == test_num_images

#convert to array from buffer bytes

train_array = np.frombuffer(data_train, dtype=np.uint8).astype(float)

test_array = np.frombuffer(data_test, dtype=np.uint8).astype(float)

#break the byte array into images

train_array = train_array.reshape(train_num_images, image_size, image_size)

test_array = test_array.reshape(test_num_images, image_size, image_size)

#rotate and flip the images to align correctly

#also normalize the data to [0,1]

train_data = [image.transpose()/np.max(image) for image in train_array]

test_data = [image.transpose()/np.max(image) for image in test_array]

             

#get the labels

#train_labels = [row_vector(y, 47) for y in np.frombuffer(label_train, dtype=np.uint8)]

#test_labels = [row_vector(y, 47) for y in np.frombuffer(label_test, dtype=np.uint8)]

#in pytorch use the label (index of the output vector)

train_labels = np.frombuffer(label_train, dtype=np.uint8)

test_labels = np.frombuffer(label_test, dtype=np.uint8) 

tr_data = list(zip(train_data, train_labels)) #in python3 needs to convert zip to list

te_data = (test_data, test_labels)

print('========== finish loading data ==========')

#tr_data = tr_data[:5000]

#te_data = te_data[:5000]

net = Net()

'''

cross entropy loss in pytorch

inputs have a shape of (N, C), N is the min batch size, C is the #classes

e.g. 10 samples, each sample's prediction is [0.5, 0.9, 0.1] (i.e. 3 classes)

Note the scores of all class don't necesarrily add up to 1, the loss function

here uses softmax: exp(score1)/sum(exp(score i)) to convert to probability

Cross entropy = -log(score) where score is the score for the correct class

the labels have a shape of (N), and each element is the index of the correct class

e.g. labels = [1, 0, 2] means first sample's class is 1, seconds is 0 and last is 2.

'''

loss_fn = nn.CrossEntropyLoss()

optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

epochs = 50  

mini_batch_size = 20  

for i in range(epochs):

    #re-shuffle the training data every epoch

    random.shuffle(tr_data)

    #divide the training data set into mini batches as per the mini batch size 

    mini_batches = [tr_data[j:j + mini_batch_size] for j in range(0, len(tr_data), mini_batch_size)]

    

    for mini_batch in mini_batches:

        inputs = torch.tensor([[x] for x, y in mini_batch], dtype=torch.float32, requires_grad=True) #the tensor is in shape (batch_zie, 1, 28, 28)

        labels = torch.tensor([y for x, y in mini_batch], dtype=torch.int64) #cross entropy requires int64, the tensor is in shape (batch_size,1)        

        # zero the parameter gradients

        optimizer.zero_grad()

        #feed in a mini_batch of training data

        outputs = net(inputs)

        #calculate the loss

        loss = loss_fn(outputs, labels)

        #calculate the gradient

        loss.backward()

        #adjust the weights

        optimizer.step()

    if te_data:

        inputs, labels = te_data

        inputs = torch.tensor([[x] for x in test_data], dtype=torch.float32)#the tensor is in shape (batch_szie, 1, 28, 28)

        labels = torch.from_numpy(labels).long() #ie. int64

        with torch.no_grad():

            outputs = net(inputs) #the output is in shape (batch_size, 47)

            max_values, max_indices = torch.max(outputs, 1) #max values on dim = 1

            precision = sum(int(m == n) for m, n in zip(max_indices, labels)) / float(len(labels))

            print("Epoch {0}: {1}     {2}".format(i, precision, datetime.now()))       

    else:

        print("Epoch {0} done".format(i))