View in colab - test
View in colab - client submission
#@title Main Code - Run this to train the network
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import torch.optim as optim
import cv2
from google.colab.patches import cv2_imshow
import mediapipe as mp
import matplotlib.pyplot as plt
import matplotlib as mpl
from scipy.io import wavfile
import os
import math
import joblib
import librosa
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
vid_dir = input('path of video directory - ') #all videos should be same duration
aud_dir = input('path of audio directory - ') #all audio clips should match the video duration and the name.Should be in wav format
BatchSize = int(input('Batch size - '))
hidden_dim = 40
n_layers = 60 #this parameter defines how many rnn layers. research paper suggests 60
length = 150 #This parameter corresponds to duration of the video.Number of frames per single clip. If you have longer clips you can have a igger value
n_epochs = int(input('Epochs - '))
lr=float(input('Learning Rate - ')) #0.001
vid_ids = [files for files in os.walk(vid_dir)]
vid_ids = [filenames[:-4] for filenames in vid_ids[0][2] ]
def get_unique(c):
templist = list(c)
tempset = set()
for t in templist:
tempset.add(t[0])
tempset.add(t[1])
return list(tempset)
def midpoint(point1,point2,point3,point4, img):
x = int( (point1.x * img.shape[1] + point2.x * img.shape[1] + point3.x * img.shape[1] + point4.x * img.shape[1] )/4 )
y = int( (point1.y * img.shape[0] + point2.y * img.shape[0] + point3.y * img.shape[0] + point4.y * img.shape[0] )/4 )
return x,y
def angleTheta(x,y):
return math.atan2(y,x)
def Facetransform(x0,y0,theta):
x1 = int(x0*math.sin(theta) - y0* math.cos(theta) )
y1 = int(x0*math.cos(theta) + y0* math.sin(theta) )
return x1,y1
mpDraw = mp.solutions.drawing_utils
mpFaceMesh = mp.solutions.face_mesh
connection_lips = get_unique( mpFaceMesh.FACEMESH_LIPS )#FaceMesh(max_num_faces=1)
connection_face = get_unique( mpFaceMesh.FACEMESH_FACE_OVAL )
facepoints = [connection_face[3],connection_face[10],connection_face[20],connection_face[24] ]
X_data = []
Y_data = []
Z_data = []
melvector = torch.empty(0,length,128)
for files in vid_ids:
video = vid_dir +'/'+files+'.mp4'
audio = aud_dir + '/' + files+'.wav'
cap = cv2.VideoCapture(video) #reading the video
aud, Fs = librosa.load(audio) #reading the audio
hop_length = int(len(aud)/length+1)
S = librosa.feature.melspectrogram(y=aud, sr=Fs,n_fft = 2048,hop_length=hop_length, n_mels=128,fmax=8000)
#finding the fps
(major_ver, minor_ver, subminor_ver) = (cv2.__version__).split('.')
if int(major_ver) < 3 :
fps = cap.get(cv2.cv.CV_CAP_PROP_FPS)
#print ("Frames per second using video.get(cv2.cv.CV_CAP_PROP_FPS): {0}".format(fps))
else :
fps = cap.get(cv2.CAP_PROP_FPS)
#print ("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(fps))
#length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#creating facepoints
with mpFaceMesh.FaceMesh(static_image_mode = True, max_num_faces=1, refine_landmarks = True, min_detection_confidence = 0.5) as faceMesh:
while(cap.isOpened()):
success,img = cap.read()
if success == False:
break
results = faceMesh.process( cv2.cvtColor(img, cv2.COLOR_BGR2RGB) )
if results.multi_face_landmarks:
tempx = []
tempy = []
tempz = []
for face_landmark in results.multi_face_landmarks:
lms = face_landmark.landmark
x1,y1 = midpoint(lms[facepoints[0]] ,lms[facepoints[1]],lms[facepoints[2]] ,lms[facepoints[3]], img)
theta = angleTheta( int(lms[facepoints[1]].x* img.shape[1]-x1) ,int(lms[facepoints[1]].y* img.shape[0]-y1) )
d = {}
for index in connection_lips:
#print(index, lms[index].x)
x = int(lms[index].x * img.shape[1] -x1)
y = int(lms[index].y * img.shape[0] -y1)
x,y = Facetransform(x,y,theta)
x = int(x+x1)
y = int(y+y1)
tempx.append(x)
tempy.append(y)
tempz.append(lms[index].z)
# d[index] = (x, y)
# cv2.circle(img, ( x, y ) , 2, (255,0,0), -1 )
# print(math.tan(theta))
# cv2.circle(img, ( x1, y1 ) , 2, (0,255,0), -1 )
else:
tempx = []
tempy = []
tempz = []
for i in range(40):
tempx.append(0)
tempy.append(0)
tempz.append(0)
X_data.append(tempx)
Y_data.append(tempy)
Z_data.append(tempz)
cap.release()
#creating audio
S = torch.tensor(S.T[:length]).view(-1,length,128)
melvector = torch.cat((melvector, S), dim=0)
#print(np.array(audio.T[0]).shape)
#print(np.array(X_data).shape)
time_per_frame_in_video = 1/fps #seconds 1/30
time_per_frame_in_audio = time_per_frame_in_video * Fs
sequences = time_per_frame_in_audio
input_size = int(len(audio)/length)
X_data = torch.tensor(X_data)
Y_data = torch.tensor(Y_data)
Z_data = torch.tensor(Z_data)
X_data = X_data[:int(X_data.size(dim=0)/BatchSize) * BatchSize ].view( -1,BatchSize , length, 40)
Y_data = Y_data[:int(Y_data.size(dim=0)/BatchSize) * BatchSize ].view( -1,BatchSize , length, 40)
Z_data = Z_data[:int(Z_data.size(dim=0)/BatchSize) * BatchSize ].view( -1,BatchSize , length, 40)
aud = melvector[:int(melvector.size(dim=0)/BatchSize)* BatchSize ].view( -1,BatchSize , length, 128)
number_of_batches = aud.size(dim = 0)
class Model(nn.Module):
def __init__(self, input_size, output_size, hidden_dim, n_layers):
super(Model, self).__init__()
# Defining some parameters
self.hidden_dim = hidden_dim
self.n_layers = n_layers
#Defining the layers
# RNN Layer
self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
# Fully connected layer
self.fcx = nn.Linear(hidden_dim, output_size)
self.fcy = nn.Linear(hidden_dim, output_size)
self.fcz = nn.Linear(hidden_dim, output_size)
def forward(self, x):
batch_size = x.size(0)
# Initializing hidden state for first input using method defined below
hidden = self.init_hidden(batch_size)
# Passing in the input and hidden state into the model and obtaining outputs
out, hidden = self.rnn(x, hidden)
# Reshaping the outputs such that it can be fit into the fully connected layer
#out = out.contiguous().view(-1, self.hidden_dim)
outX = self.fcx(out)
outY = self.fcy(out)
outZ = self.fcz(out)
return outX, outY, outZ
def init_hidden(self, batch_size):
# This method generates the first hidden state of zeros which we'll use in the forward pass
# We'll send the tensor holding the hidden state to the device we specified earlier as well
hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)
return hidden
model = Model(128,40,hidden_dim*3,60).to(device)
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_vector = []
for epoch in range(n_epochs): # 3 full passes over the data
# model.train()
for data in range(number_of_batches): # `data` is a batch of data
model.zero_grad() # sets gradients to 0 before loss calc. You will do this likely every step.
outX,outY,outZ = model.forward(aud[data].to(device).float()) # pass in the reshaped batch (recall they are 28x28 atm)
lossX = loss_function(outX,X_data[data].to(device).float())
lossY = loss_function(outY,Y_data[data].to(device).float())
lossZ = loss_function(outZ,Z_data[data].to(device).float())
loss = lossX+lossY+lossZ
loss.backward() # apply this loss backwards thru the network's parameters
optimizer.step() # attempt to optimize weights to account for loss/gradients
print(epoch,loss)
loss_vector.append(loss)
#@title Plot loss
Lv = []
for i in range(len(loss_vector)):
Lv.append(loss_vector[i].item())
print(Lv)
plt.plot(Lv)
plt.xlabel("iteration")
plt.ylabel("loss")
plt.show()
#@title Save the model (This is optional, If you think you got good results you can save the model for later use)
model.eval()
location = input('Enter the location you would like to save this model/ the model name - ')
#Enter the location you would like to save this model ---> this is the folder you are putting it
#the model name ----> give any name you like
joblib.dump(model, location+'/trained.pkl')
#@title You can feed test audio here after training
model.eval()
test = input('audio file path - ')
test, Fs = librosa.load(test)
hop_length = int(len(aud)/length+1)
S = librosa.feature.melspectrogram(y=test, sr=Fs,n_fft = 2048,hop_length=hop_length, n_mels=128,fmax=8000)
S = torch.tensor(S.T[:length]).view(-1,150,128).to(device)
Lips_X,Lips_Y,Lips_Z = model(S)
print(Lips_X[0][1]) #for 150 frames ,40 lip points
print(Lips_Y.shape)
print(Lips_Z.shape)