Take a mono-channel 16KHz 16bit PCM audio signal for example, the uncompressed bit rate is 256000 bitrate. If the Opus audio codec is to compressed the audio by 4 times, the compressed bitrate is 64000. Note, the Opus audio codec has lots of constraints on parameters settings such as frame size.
import subprocess as sp
import numpy as np
import opuslib
import opuslib.api.ctl
from opuslib.api import decoder as opus_decoder
from opuslib.api import encoder as opus_encoder
import os
if os.name == 'nt':
import matplotlib
# print(matplotlib.get_backend()) ### module://backend_interagg
matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt
sample_width = 2 ## each sample has two bytes
class OpusCodec():
def __init__(self, *args, **kwargs):
self.frame_size = 320 # 20ms*16kHz
self.channels = 1
self.rate = 16000
self.bitrate = 64000
# print(opus_encoder.get_size(1))
self.encoder = opus_encoder.create_state(self.rate, self.channels, opuslib.APPLICATION_AUDIO)
opus_encoder.encoder_ctl(self.encoder, opuslib.api.ctl.set_bitrate, self.bitrate)
opus_encoder.encoder_ctl(self.encoder, opuslib.api.ctl.set_vbr, 0)
self.decoder = opus_decoder.create_state(self.rate, self.channels)
def encode(self, data, **kwargs):
if not 'frame_size' in kwargs:
kwargs['frame_size'] = self.frame_size
out = opus_encoder.encode(self.encoder, data, self.frame_size, len(data))
return out
def decode(self, data, **kwargs):
if not 'frame_size' in kwargs:
kwargs['frame_size'] = self.frame_size
out = opus_decoder.decode(self.decoder, data, len(data), self.frame_size, 0, self.channels)
return out
def pcmint2float(sig):
# convert PCM to float
i = np.iinfo(sig.dtype)
abs_max = 2 ** (i.bits - 1)
offset = i.min + abs_max
sig_float = (sig.astype('float32') - offset) / abs_max
return sig_float
def main():
# fig = plt.figure()
ax = plt.axes()
op = OpusCodec()
##################################
### stream in the data with ffmpeg
FFMPEG_BIN = "ffmpeg"
command = [FFMPEG_BIN,
'-i', "TestData/test.wav",
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', str(op.rate), # ouput will have 16000 Hz
'-ac', str(op.channels), # stereo (set to '1' for mono)
'-']
pipe = sp.Popen(command, stdout=sp.PIPE, bufsize=10 ** 8)
data = []
while True:
da = pipe.stdout.read(op.frame_size * op.channels * sample_width)
if not da:
break
data.append(da)
# ##################################
# ### load in the data with librosa
# import librosa
# sig, sr = librosa.load("TestData/test.wav", sr=None)
# data = []
# for i in range(int(len(sig)/320)): # every 20 ms
# da_float32 = sig[i*320:(i+1)*320]
# da_float16 = da_float32.astype(np.float16)
# da_int = da_float16 * (2 ** 15)
# da_int = da_int.astype(np.int16)
# da = da_int.tobytes() ## identical to ffempeg's byte data input
# data.append(da)
##################################
### encoding
encdata = []
for x in data:
encdata.append(op.encode(x))
##################################
### decoding
decdata = []
for x in encdata:
decdata.append(op.decode(x))
print("DATA LENGTH :", len(b''.join(data)))
print("ENCDATA LENGTH :", len(b''.join(encdata)))
print("DECDATA LENGTH :", len(b''.join(decdata)))
# # save the recovered signal to compare with the original one
# import soundfile as sf
# temp_audio_name = "recovered_signal.wav"
# # write to wav
# if os.path.isfile(temp_audio_name):
# os.remove(temp_audio_name)
# sf.write(temp_audio_name, y_data_recover, op.rate)
if __name__ == '__main__':
main()