空間スペクトル推定
Last update : 2018/08/23
空間スペクトル推定モジュール
spatialSpec.py
関数SpatilaCorrelation()では、マイクロホンアレイのデータから、空間相関行列を計算します。クラスSpatialSpectrumは、空間相関行列とアレイマニフォールド行列から、空間スペクトルを計算します。空間スペクトルを推定する手法は、(1) 部分空間法(MUSIC法, MU)、(2)最小分散法(MV)、(c)遅延和法(DS)の3つです。MUSIC法を使った場合は、計算の過程で、固有値や固有ベクトルを求めますが、これらは、空間スペクトルを算出したあとでも、再利用する場合があるので、あとから属性として取り出せるようになっています。空間スペクトルを推定する3つの方法は、次式で与えられます。M,Nは、それぞれ、センサ数、音源数を表します。a(θ)は、θ方向のアレイ・マニフォールドを表します。Rは空間相関行列を、eiはRのi番目の固有ベクトルを表します。
(1)
(2)
(3)
#
# Calculate spatial spectrum
#
# Ver 1.0, 2013/7/25 F.Asano
import numpy as np
import scipy.fftpack as ft
import scipy.linalg as alg
###########################################
# Function: SpatialCorrelation # 空間相関行列
# z: array observation data
# w: window function for STFT
# prm: parameters
###########################################
def SpatialCorrelation(z,w,prm):
NFFT = prm.NFFT
M = prm.NumberOfSensor
NFrame = prm.getNumberOfFrame()
R = np.zeros((NFFT/2+1,M,M),dtype='complex')
ZZ = np.zeros((M,NFFT),dtype='complex')
Z = np.zeros(M,dtype='complex')
zb = np.zeros(prm.FrameLength,dtype='float')
for n in range(NFrame):
t1 = prm.FrameShift*n
t2 = prm.FrameShift*n + prm.FrameLength
for m in range(M):
zb = z[m,t1:t2]*w
ZZ[m,:] = ft.fft(zb,NFFT)
for k in range(NFFT/2+1):
Z = ZZ[:,k]
R[k,:,:] = R[k,:,:] + np.outer(Z,np.conjugate(Z))
return R
##################################
# Class: SpatialSpectrum # 空間スペクトル
# RR: spatial correlation matrix
# A: array manifold matrix
# prm: Parameters
##################################
class SpatialSpectrum:
# Constructor
def __init__(self,RR,A,prm):
self.prm = prm
NFreq = prm.NFFT/2+1
M = prm.NumberOfSensor
Ns = prm.NumberOfSource
NAngle = prm.NumberOfAngle
[k1,k2] = prm.getFrequencyIndex()
# Frequency loop
P = np.zeros((NFreq,NAngle),dtype='float')
eigenValue = np.zeros((NFreq,M),dtype='float')
eigenVector = np.zeros((NFreq,M,M),dtype='complex')
for k in range(k1,k2):
R = RR[k,:,:]
# Eigenvalue decomposition # 固有値分解
if prm.method == 'MUSIC':
(s,E) = alg.eigh(R)
eigenValue[k,:] = s
eigenVector[k,:,:] = E
# Spatial spectrum
for n in range(NAngle):
a = A[k,n,:]
if prm.method == 'MUSIC': # MUSIC法
En = E[:,0:M-Ns] # eigenvalues are in ascending order
d = np.dot(np.conj(a),En) # don't use vdot!
P[k,n] = 1/np.real(np.dot(np.conj(d),d))
elif prm.method == 'MV': # 最小分散(MV)法
aR = np.dot(np.conj(a),alg.inv(R))
P[k,n] = 1/np.real(np.dot(aR,a))
elif prm.method == 'DS': # 遅延和(DS)法
aR = np.dot(np.conj(a),R)
P[k,n] = np.real(np.dot(aR,a))/float(M)
else:
print 'Wrong method %s' % prm.method
self.P = P
if prm.method == 'MUSIC':
self.eigenValue = eigenValue
self.eigenVector = eigenVector
# Average spatial spectrum # 周波数平均
def averageSpectrum(self,averageWeight):
M = self.prm.NumberOfSensor
Ns = self.prm.NumberOfSource
[k1,k2] = self.prm.getFrequencyIndex()
NAngle = self.prm.NumberOfAngle
NFreq = self.prm.NFFT/2+1
if averageWeight == 'EigenValue': # 周波数重み
w = np.zeros(NFreq,dtype='float')
for k in range(k1,k2):
s = self.eigenValue[k,M-Ns:M]
w[k] = np.sqrt(np.sum(np.real(s)))
elif averageWeight == 'Flat':
w = np.ones(NFreq,dtype='float')
else:
print 'Wrong average weight'
Pavg = np.zeros(NAngle,dtype='float')
for n in range(NAngle):
Pavg[n] = np.sum(w[k1:k2]*self.P[k1:k2,n])
return Pavg
###########################
# Class: Param # 分析に用いるパラメタ
###########################
class Param:
method = 'MUSIC'
FrequencyRange = [0.,0.]
SamplingFrequency = 16000.0
NumberOfSource = 0
NumberOfSensor = 0
FrameLength = 0
FrameShift = 0
BlockLength = 0
NFFT = 0
def getFrequencyIndex(self):
index = np.zeros(2,dtype='int')
FRange = self.FrequencyRange
Fs = self.SamplingFrequency
NFFT = float(self.NFFT)
for m in range(len(FRange)):
index[m] = int(np.floor( FRange[m]/Fs*NFFT ))
index[1] += 1 # for the use in range()
return index
def getNumberOfFrame(self):
BLen = float(self.BlockLength)
FLen = float(self.FrameLength)
FShift = float(self.FrameShift)
NFrame = int(np.floor( (BLen-(FLen-FShift))/FShift ))
return NFrame
サンプルプログラム
空間スペクトルを推定する手順は、次のようになっています。
Kinectによって収録したデータを読み込む。
アレイマニフォールド行列のデータを計算する
空間相関行列を計算する
空間スペクトル(MUSIC,MV,DS)を計算する
# 空間スペクトル
# Ver. 1.1 2018/2/6
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as sig
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
import sys
import sys
sys.path.append('../lib')
import kinectSensor as kin
import spatialSpec as ss
# Directories & Files
fileNumber = 1
if fileNumber == 1:
observationFile = '../Data/Kinect/Speech0_Noise-45.wav' # 音源数=2のデータ
blockTarget = 28
elif fileNumber == 2:
observationFile = '../Data/Kinect/kinectMoving.wav' # 移動音源のデータ
blockTarget = 20
print( 'File Name: ',observationFile )
# Parameters
direction = np.arange(-90.0, 91.0, 2.0, dtype='float') # AMVを計算する方向
SoundVelocity = 340.0 # 音速
prm = ss.Param()
prm.FrequencyRange = [800.,3000.] # 周波数範囲
prm.SamplingFrequency = 16000.0 # サンプリング周波数
prm.NumberOfSource = 1 # MUSIC法における音源数
prm.NumberOfSensor = 4 # マイクロホン数
prm.FrameLength = 512 # フレーム長(FFTを計算する単位)
prm.FrameShift = 128 # フレームシフト
prm.BlockLength = 4000 # ブロック長(空間スペクトルを計算する単位)
prm.method = 'MUSIC' # MUSIC, MV, or DS # 音源定位の方法
prm.NFFT = prm.FrameLength # FFTの点数
dataType = 'int' # wavファイルのデータ形式
averageWeight = 'Flat' # 周波数平均に用いる重み 'Flat' or 'EigenValue'
if prm.method != 'MUSIC' and averageWeight == 'EigenValue':
print( "Wrong weighting parameter" )
sys.exit()
GraphDirectory = 'Graph'
flagFigSave = False
# 観測値の読み込み
z, Fs = kin.multiWavRead(observationFile,dataType) # Kinectデータの読み込み
(M,dataLength) = z.shape
fig = plt.figure() # 入力波形の表示
plt.plot(z[0,:])
if flagFigSave:
plt.savefig('Graph/input.png',format='png')
print( 'Sampling Frequency: %d' % Fs )
# アレイ・マニフォールド
A = kin.getAMatrix(direction,kin.sensorLocation,Fs,prm.NFFT,SoundVelocity)
# パラメタのチェック
(NFreq,NAngle,NSensor) = A.shape
prm.NumberOfAngle = NAngle
if prm.FrameLength != (NFreq-1)*2:
print( 'Wrong frame length' )
sys.exit()
if prm.NumberOfSensor != NSensor or prm.NumberOfSensor != M:
print( 'Wrong number of sensor' )
sys.exit()
NBlock = int(dataLength/prm.BlockLength)
print( 'NBlock:',NBlock )
# FFTに用いる窓関数
w = sig.hamming(prm.FrameLength)
# targetブロックの空間スペクトル
t1 = prm.BlockLength*blockTarget # ブロックの始点
t2 = prm.BlockLength*(blockTarget+1) # ブロックの終点
R = ss.SpatialCorrelation(z[:,t1:t2],w,prm) # 空間相関行列
spectrum = ss.SpatialSpectrum(R,A,prm)
Pavg = spectrum.averageSpectrum(averageWeight) # 空間スペクトルの周波数平均
s = 10.0*np.log10(Pavg) # スペクトルのdB表示
angle = np.float64(direction)
fig = plt.figure()
plt.plot(angle,s)
plt.xlabel('Direction [deg]')
plt.ylabel('Spectrum [dB]')
index = sig.argrelmax(s) # スペクトルのピーク
for d in angle[index]:
print( 'Angle estimate:',d )
gmin,gmax = plt.ylim()
plt.vlines(d,gmin,gmax,colors='r',linestyles='--')
if flagFigSave:
plt.savefig('Graph/spectrum.png',format='png')
# 空間スペクトログラム
S = np.zeros((len(direction),NBlock))
for block in range(NBlock):
t1 = prm.BlockLength*block
t2 = prm.BlockLength*block + prm.BlockLength
R = ss.SpatialCorrelation(z[:,t1:t2],w,prm)
spectrum = ss.SpatialSpectrum(R,A,prm)
Pavg = spectrum.averageSpectrum(averageWeight)
S[:,block] = 10.0*np.log10(Pavg)
if block == blockTarget:
P = spectrum.P
fig = plt.figure()
tm = np.float64(range(NBlock))*(float(prm.BlockLength)/float(prm.SamplingFrequency))
GX,GY = np.meshgrid(tm,direction)
plt.pcolor(GX,GY,S)
plt.ylabel('Direction [deg]')
plt.xlabel('Time [s]')
blockTime = (blockTarget-1)*(float(prm.BlockLength)/float(prm.SamplingFrequency))
plt.vlines(blockTime,min(direction), max(direction), colors='b',linestyles='--')
if flagFigSave:
plt.savefig('Graph/spectrogram.png',format='png')
# targetブロックの周波数-空間スペクトル
fig = plt.figure()
[k1,k2] = prm.getFrequencyIndex()
freq = np.float64(np.arange(k1,k2))/float(prm.NFFT)*prm.SamplingFrequency
GX,GY = np.meshgrid(direction,freq)
plt.pcolor(GX,GY,P[k1:k2,:])
plt.xlabel('Direction [deg]')
plt.ylabel('Frequency [Hz]')
plt.xlim(min(direction),max(direction))
plt.ylim(min(freq),max(freq))
if flagFigSave:
plt.savefig('Graph/freq-spectrum.png',format='png')
fig = plt.figure() # 3次元表示
ax = fig.gca(projection='3d')
ax.plot_surface(GX, GY, P[k1:k2,:], rstride=1, cstride=1, cmap=cm.coolwarm,
linewidth=0, antialiased=False)
ax.set_xlabel('Direction [deg]')
ax.set_ylabel('Frequency [Hz]')
ax.set_zlabel('Spectrum [dB]')
if flagFigSave:
plt.savefig('Graph/freq-spectrum3D.png',format='png')
# 単一周波数の空間スペクトル
freq = 1000 # Hz
freqIndex = int( (freq/Fs)*prm.NFFT )
print('Frequency=%.1f' % (Fs/prm.NFFT*freqIndex) )
fig = plt.figure()
plt.plot(direction,P[freqIndex,:])
plt.xlabel('Direction [deg]')
plt.ylabel('Spectrum [dB]')
plt.title( 'Frequency=%.1f index=%d' % ((Fs/prm.NFFT*freqIndex), freqIndex) )
if flagFigSave:
plt.savefig('Graph/single-spectrum.png',format='png')
# 固有値分布
if prm.method == 'MUSIC':
plt.figure()
plt.plot( np.arange(M),10*np.log10(np.real(spectrum.eigenValue[freqIndex])),'ob-' )
plt.xlabel('Eigenvalue number')
plt.ylabel('Eigenvalue [dB]')
plt.xticks(np.arange(M))
if flagFigSave:
plt.savefig('Graph/eigenvalues.png',format='png')
plt.show()
図1は観測波形、図2はこれに対応した空間スペクトログラムです。この観測波形では、時刻0 [s]から-45°に設置した雑音源(白色雑音)が、時刻4 [s]から 0 °に設置した信号源(音声、人)が、それぞれ鳴り始めます。図3は図2の青点線の時刻における平均空間スペクトルです。この図から、上述の音源の位置が推定されているのがわかります。図4は、図3の平均処理を行う前の、周波数ごとの空間スペクトルです。図5は1000Hzにおける空間スペクトルです。これらの図から、それぞれの音源は、周波数ごとに見ると、スパースであることがわかります。このため、このサンプルプログラムでは、音源数 prm.NumberOfSource = 1 としています。図6は、MUSIC法において算出する固有値分布を示しています(1000Hz)。
図1:観測波形
図2:空間スペクトログラム
図3:平均空間スペクトル
図4(a):空間-周波数スペクトル
図4(b):(a)の3次元表示
図5:単一周波数(1000Hz)の空間スペクトル
図6:単一周波数(1000Hz)の固有値分布
参考文献
浅野 太著、“音のアレイ信号処理”、コロナ社 (2011)