import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class GPTDecoder(nn.Module):
def __init__(self, input_dim, model_dim, nhead, num_layers):
super(GPTDecoder, self).__init__()
self.model_dim = model_dim
# Embedding layer
self.embedding = nn.Embedding(
num_embeddings=input_dim, # Size of the vocabulary
embedding_dim=model_dim # Dimension of each embedding vector
)
# Transformer decoder layer
decoder_layer = nn.TransformerDecoderLayer(
d_model=model_dim,
nhead=nhead,
dim_feedforward=128,
dropout=0.1,
activation='relu'
)
self.transformer_decoder = nn.TransformerDecoder(
decoder_layer,
num_layers=num_layers
)
# Linear layer to project the decoder output to vocabulary size
self.fc_out = nn.Linear(model_dim, input_dim)
def forward(self, tgt):
# Embedding and scaling
tgt = self.embedding(tgt) * np.sqrt(self.model_dim)
# Create look-ahead mask for the target sequence
tgt_mask = nn.Transformer.generate_square_subsequent_mask(len(tgt)).to(tgt.device)
# Pass through transformer decoder
output = self.transformer_decoder(tgt, tgt, tgt_mask=tgt_mask)
output = self.fc_out(output)
return output
# Parameters
input_dim = 10 # Size of vocabulary
model_dim = 32 # Embedding dimension
nhead = 4 # Number of attention heads
num_layers = 2 # Number of transformer decoder layers
seq_len = 5 # Length of the sequence
batch_size = 3 # Batch size
# Initialize the model, optimizer, and loss function
model = GPTDecoder(input_dim, model_dim, nhead, num_layers)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# Generate random sequences for demo
def generate_random_sequence(batch_size, seq_len, input_dim):
return torch.randint(0, input_dim, (batch_size, seq_len))
# Example dataset
tgt = generate_random_sequence(batch_size, seq_len, input_dim)
# Shift target sequence by 1 for training
tgt_input = tgt[:, :-1] # Input to the model (exclude the last token)
tgt_target = tgt[:, 1:] # Target for the model (shifted sequence)
# Training loop
num_epochs = 5 # Number of training epochs
for epoch in range(num_epochs):
model.train()
# Forward pass through the model
output = model(tgt_input)
# Calculate loss
loss = criterion(output.view(-1, input_dim), tgt_target.contiguous().view(-1))
# Backward pass and optimization
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Print loss for this epoch
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
# Print the final output
print("Output shape:", output.shape)
print("Output:", output)