import torch
from torch.optim.lr_scheduler import LinearLR, ConstantLR,ExponentialLR
from torch.optim.lr_scheduler import SequentialLR # ChainedScheduler
def lr_policy(optimiser):
# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimiser, 0.96, last_epoch=-1)
scheduler_warmup = LinearLR(optimiser, start_factor=0.05, end_factor=1.0, total_iters=100, verbose=False)
scheduler_const = ConstantLR(optimiser, factor=1.0, total_iters=100)
scheduler_exdecay = ExponentialLR(optimiser, gamma=0.99)
scheduler = SequentialLR(optimiser, schedulers=[scheduler_warmup, scheduler_const, scheduler_exdecay], milestones=[100, 200])
# scheduler = ChainedScheduler([scheduler_warmup, scheduler_const, scheduler_exdecay]) # multiplication
return scheduler
if __name__ == '__main__':
# visualise the LR policy
from torch.utils.tensorboard import SummaryWriter
model = [torch.nn.Parameter(torch.randn(2, 2, requires_grad=True))]
optimiser = torch.optim.Adam(model, 1e-3)
scheduler = lr_policy(optimiser)
# Writer will output to ./runs/ directory by default
writer = SummaryWriter(log_dir=f'logs/net')
for epoch in range(500):
# get learning rate and update the lr
# if this get_last_lr() is giving some error, follow the fix here
# https://github.com/pytorch/pytorch/pull/69112/files/7a61cb796cd16c4f38fdb4ee7ce3843c48572f49#diff-036a7470d5307f13c9a6a51c3a65dd014f00ca02f476c545488cd856bea9bcf2
# my_lr = scheduler.get_last_lr()
my_lr = optimizer.param_groups[0]['lr']
scheduler.step()
writer.add_scalar("lr", my_lr, epoch)
writer.close()
a = 0
Another quick way to schedule the lr is even more flexible, where the scaling of the initial lr can be defined in _get_lr_scale, the following example is using the lr scheduler from "attention is all you need" with extra annealling process to further supress the the lr.
class ScheduledOptimSimple:
""" A simple wrapper class for learning rate scheduling """
def __init__(self, model, current_step):
self._optimizer = torch.optim.Adam(
model, # model.parameters(),
betas=[0.9, 0.98],
eps=0.000000001,
weight_decay=0.0,
)
self.n_warmup_steps = 4000
self.anneal_steps = [300000, 400000, 500000]
self.anneal_rate = 0.3
self.current_step = current_step
self.init_lr = 0.01 # np.power(256, -0.5)
def step_and_update_lr(self):
self._update_learning_rate()
self._optimizer.step()
def zero_grad(self):
# print(self.init_lr)
self._optimizer.zero_grad()
def load_state_dict(self, path):
self._optimizer.load_state_dict(path)
def _get_lr_scale(self):
scale = np.min(
[
np.power(self.current_step, -0.5),
np.power(self.n_warmup_steps, -1.5) * self.current_step,
]
)
for s in self.anneal_steps:
if self.current_step > s:
scale = scale * self.anneal_rate
return scale
def _update_learning_rate(self):
""" Learning rate scheduling per step """
self.current_step += 1
lr = self.init_lr * self._get_lr_scale()
for param_group in self._optimizer.param_groups:
param_group["lr"] = lr
if __name__ == '__main__':
# visualise the LR policy
from torch.utils.tensorboard import SummaryWriter
model = [torch.nn.Parameter(torch.randn(2, 2, requires_grad=True))] # this actually is model parameters
optimizer = ScheduledOptimSimple(model, 0)
# Writer will output to ./runs/ directory by default
writer = SummaryWriter(log_dir=f'../temp')
for epoch in range(20000):
# total_loss = .....
# total_loss.backward()
optimizer.step_and_update_lr()
optimizer.zero_grad()
my_lr = optimizer._optimizer.param_groups[0]['lr']
# print(my_lr)
if epoch%1==0:
writer.add_scalar("lr", my_lr, epoch)
writer.close()
a = 0