空間スペクトル推定

Last update : 2018/08/23

空間スペクトル推定モジュール

spatialSpec.py

関数SpatilaCorrelation()では、マイクロホンアレイのデータから、空間相関行列を計算します。クラスSpatialSpectrumは、空間相関行列とアレイマニフォールド行列から、空間スペクトルを計算します。空間スペクトルを推定する手法は、(1) 部分空間法(MUSIC法, MU)、(2)最小分散法(MV)、(c)遅延和法(DS)の3つです。MUSIC法を使った場合は、計算の過程で、固有値や固有ベクトルを求めますが、これらは、空間スペクトルを算出したあとでも、再利用する場合があるので、あとから属性として取り出せるようになっています。空間スペクトルを推定する3つの方法は、次式で与えられます。M,Nは、それぞれ、センサ数、音源数を表します。a(θ)は、θ方向のアレイ・マニフォールドを表します。Rは空間相関行列を、eiRi番目の固有ベクトルを表します。

(1)

(2)

(3)

#

# Calculate spatial spectrum

#

# Ver 1.0, 2013/7/25 F.Asano

import numpy as np

import scipy.fftpack as ft

import scipy.linalg as alg

###########################################

# Function: SpatialCorrelation # 空間相関行列

# z: array observation data

# w: window function for STFT

# prm: parameters

###########################################

def SpatialCorrelation(z,w,prm):

NFFT = prm.NFFT

M = prm.NumberOfSensor

NFrame = prm.getNumberOfFrame()

R = np.zeros((NFFT/2+1,M,M),dtype='complex')

ZZ = np.zeros((M,NFFT),dtype='complex')

Z = np.zeros(M,dtype='complex')

zb = np.zeros(prm.FrameLength,dtype='float')

for n in range(NFrame):

t1 = prm.FrameShift*n

t2 = prm.FrameShift*n + prm.FrameLength

for m in range(M):

zb = z[m,t1:t2]*w

ZZ[m,:] = ft.fft(zb,NFFT)

for k in range(NFFT/2+1):

Z = ZZ[:,k]

R[k,:,:] = R[k,:,:] + np.outer(Z,np.conjugate(Z))

return R

##################################

# Class: SpatialSpectrum # 空間スペクトル

# RR: spatial correlation matrix

# A: array manifold matrix

# prm: Parameters

##################################

class SpatialSpectrum:

# Constructor

def __init__(self,RR,A,prm):

self.prm = prm

NFreq = prm.NFFT/2+1

M = prm.NumberOfSensor

Ns = prm.NumberOfSource

NAngle = prm.NumberOfAngle

[k1,k2] = prm.getFrequencyIndex()

# Frequency loop

P = np.zeros((NFreq,NAngle),dtype='float')

eigenValue = np.zeros((NFreq,M),dtype='float')

eigenVector = np.zeros((NFreq,M,M),dtype='complex')

for k in range(k1,k2):

R = RR[k,:,:]

# Eigenvalue decomposition # 固有値分解

if prm.method == 'MUSIC':

(s,E) = alg.eigh(R)

eigenValue[k,:] = s

eigenVector[k,:,:] = E

# Spatial spectrum

for n in range(NAngle):

a = A[k,n,:]

if prm.method == 'MUSIC': # MUSIC法

En = E[:,0:M-Ns] # eigenvalues are in ascending order

d = np.dot(np.conj(a),En) # don't use vdot!

P[k,n] = 1/np.real(np.dot(np.conj(d),d))

elif prm.method == 'MV': # 最小分散(MV)法

aR = np.dot(np.conj(a),alg.inv(R))

P[k,n] = 1/np.real(np.dot(aR,a))

elif prm.method == 'DS': # 遅延和(DS)法

aR = np.dot(np.conj(a),R)

P[k,n] = np.real(np.dot(aR,a))/float(M)

else:

print 'Wrong method %s' % prm.method

self.P = P

if prm.method == 'MUSIC':

self.eigenValue = eigenValue

self.eigenVector = eigenVector

# Average spatial spectrum # 周波数平均

def averageSpectrum(self,averageWeight):

M = self.prm.NumberOfSensor

Ns = self.prm.NumberOfSource

[k1,k2] = self.prm.getFrequencyIndex()

NAngle = self.prm.NumberOfAngle

NFreq = self.prm.NFFT/2+1

if averageWeight == 'EigenValue': # 周波数重み

w = np.zeros(NFreq,dtype='float')

for k in range(k1,k2):

s = self.eigenValue[k,M-Ns:M]

w[k] = np.sqrt(np.sum(np.real(s)))

elif averageWeight == 'Flat':

w = np.ones(NFreq,dtype='float')

else:

print 'Wrong average weight'

Pavg = np.zeros(NAngle,dtype='float')

for n in range(NAngle):

Pavg[n] = np.sum(w[k1:k2]*self.P[k1:k2,n])

return Pavg

###########################

# Class: Param # 分析に用いるパラメタ

###########################

class Param:

method = 'MUSIC'

FrequencyRange = [0.,0.]

SamplingFrequency = 16000.0

NumberOfSource = 0

NumberOfSensor = 0

FrameLength = 0

FrameShift = 0

BlockLength = 0

NFFT = 0

def getFrequencyIndex(self):

index = np.zeros(2,dtype='int')

FRange = self.FrequencyRange

Fs = self.SamplingFrequency

NFFT = float(self.NFFT)

for m in range(len(FRange)):

index[m] = int(np.floor( FRange[m]/Fs*NFFT ))

index[1] += 1 # for the use in range()

return index

def getNumberOfFrame(self):

BLen = float(self.BlockLength)

FLen = float(self.FrameLength)

FShift = float(self.FrameShift)

NFrame = int(np.floor( (BLen-(FLen-FShift))/FShift ))

return NFrame

サンプルプログラム

空間スペクトルを推定する手順は、次のようになっています。

    • Kinectによって収録したデータを読み込む。

    • アレイマニフォールド行列のデータを計算する

    • 空間相関行列を計算する

    • 空間スペクトル(MUSIC,MV,DS)を計算する

# 空間スペクトル

# Ver. 1.1 2018/2/6

import numpy as np

import matplotlib.pyplot as plt

import scipy.signal as sig

from matplotlib import cm

from mpl_toolkits.mplot3d import Axes3D

import sys

import sys

sys.path.append('../lib')

import kinectSensor as kin

import spatialSpec as ss

# Directories & Files

fileNumber = 1

if fileNumber == 1:

observationFile = '../Data/Kinect/Speech0_Noise-45.wav' # 音源数=2のデータ

blockTarget = 28

elif fileNumber == 2:

observationFile = '../Data/Kinect/kinectMoving.wav' # 移動音源のデータ

blockTarget = 20

print( 'File Name: ',observationFile )

# Parameters

direction = np.arange(-90.0, 91.0, 2.0, dtype='float') # AMVを計算する方向

SoundVelocity = 340.0 # 音速

prm = ss.Param()

prm.FrequencyRange = [800.,3000.] # 周波数範囲

prm.SamplingFrequency = 16000.0 # サンプリング周波数

prm.NumberOfSource = 1 # MUSIC法における音源数

prm.NumberOfSensor = 4 # マイクロホン数

prm.FrameLength = 512 # フレーム長(FFTを計算する単位)

prm.FrameShift = 128 # フレームシフト

prm.BlockLength = 4000 # ブロック長(空間スペクトルを計算する単位)

prm.method = 'MUSIC' # MUSIC, MV, or DS # 音源定位の方法

prm.NFFT = prm.FrameLength # FFTの点数

dataType = 'int' # wavファイルのデータ形式

averageWeight = 'Flat' # 周波数平均に用いる重み 'Flat' or 'EigenValue'

if prm.method != 'MUSIC' and averageWeight == 'EigenValue':

print( "Wrong weighting parameter" )

sys.exit()

GraphDirectory = 'Graph'

flagFigSave = False

# 観測値の読み込み

z, Fs = kin.multiWavRead(observationFile,dataType) # Kinectデータの読み込み

(M,dataLength) = z.shape

fig = plt.figure() # 入力波形の表示

plt.plot(z[0,:])

if flagFigSave:

plt.savefig('Graph/input.png',format='png')

print( 'Sampling Frequency: %d' % Fs )

# アレイ・マニフォールド

A = kin.getAMatrix(direction,kin.sensorLocation,Fs,prm.NFFT,SoundVelocity)

# パラメタのチェック

(NFreq,NAngle,NSensor) = A.shape

prm.NumberOfAngle = NAngle

if prm.FrameLength != (NFreq-1)*2:

print( 'Wrong frame length' )

sys.exit()

if prm.NumberOfSensor != NSensor or prm.NumberOfSensor != M:

print( 'Wrong number of sensor' )

sys.exit()

NBlock = int(dataLength/prm.BlockLength)

print( 'NBlock:',NBlock )

# FFTに用いる窓関数

w = sig.hamming(prm.FrameLength)

# targetブロックの空間スペクトル

t1 = prm.BlockLength*blockTarget # ブロックの始点

t2 = prm.BlockLength*(blockTarget+1) # ブロックの終点

R = ss.SpatialCorrelation(z[:,t1:t2],w,prm) # 空間相関行列

spectrum = ss.SpatialSpectrum(R,A,prm)

Pavg = spectrum.averageSpectrum(averageWeight) # 空間スペクトルの周波数平均

s = 10.0*np.log10(Pavg) # スペクトルのdB表示

angle = np.float64(direction)

fig = plt.figure()

plt.plot(angle,s)

plt.xlabel('Direction [deg]')

plt.ylabel('Spectrum [dB]')

index = sig.argrelmax(s) # スペクトルのピーク

for d in angle[index]:

print( 'Angle estimate:',d )

gmin,gmax = plt.ylim()

plt.vlines(d,gmin,gmax,colors='r',linestyles='--')

if flagFigSave:

plt.savefig('Graph/spectrum.png',format='png')

# 空間スペクトログラム

S = np.zeros((len(direction),NBlock))

for block in range(NBlock):

t1 = prm.BlockLength*block

t2 = prm.BlockLength*block + prm.BlockLength

R = ss.SpatialCorrelation(z[:,t1:t2],w,prm)

spectrum = ss.SpatialSpectrum(R,A,prm)

Pavg = spectrum.averageSpectrum(averageWeight)

S[:,block] = 10.0*np.log10(Pavg)

if block == blockTarget:

P = spectrum.P

fig = plt.figure()

tm = np.float64(range(NBlock))*(float(prm.BlockLength)/float(prm.SamplingFrequency))

GX,GY = np.meshgrid(tm,direction)

plt.pcolor(GX,GY,S)

plt.ylabel('Direction [deg]')

plt.xlabel('Time [s]')

blockTime = (blockTarget-1)*(float(prm.BlockLength)/float(prm.SamplingFrequency))

plt.vlines(blockTime,min(direction), max(direction), colors='b',linestyles='--')

if flagFigSave:

plt.savefig('Graph/spectrogram.png',format='png')

# targetブロックの周波数-空間スペクトル

fig = plt.figure()

[k1,k2] = prm.getFrequencyIndex()

freq = np.float64(np.arange(k1,k2))/float(prm.NFFT)*prm.SamplingFrequency

GX,GY = np.meshgrid(direction,freq)

plt.pcolor(GX,GY,P[k1:k2,:])

plt.xlabel('Direction [deg]')

plt.ylabel('Frequency [Hz]')

plt.xlim(min(direction),max(direction))

plt.ylim(min(freq),max(freq))

if flagFigSave:

plt.savefig('Graph/freq-spectrum.png',format='png')

fig = plt.figure() # 3次元表示

ax = fig.gca(projection='3d')

ax.plot_surface(GX, GY, P[k1:k2,:], rstride=1, cstride=1, cmap=cm.coolwarm,

linewidth=0, antialiased=False)

ax.set_xlabel('Direction [deg]')

ax.set_ylabel('Frequency [Hz]')

ax.set_zlabel('Spectrum [dB]')

if flagFigSave:

plt.savefig('Graph/freq-spectrum3D.png',format='png')

# 単一周波数の空間スペクトル

freq = 1000 # Hz

freqIndex = int( (freq/Fs)*prm.NFFT )

print('Frequency=%.1f' % (Fs/prm.NFFT*freqIndex) )

fig = plt.figure()

plt.plot(direction,P[freqIndex,:])

plt.xlabel('Direction [deg]')

plt.ylabel('Spectrum [dB]')

plt.title( 'Frequency=%.1f index=%d' % ((Fs/prm.NFFT*freqIndex), freqIndex) )

if flagFigSave:

plt.savefig('Graph/single-spectrum.png',format='png')

# 固有値分布

if prm.method == 'MUSIC':

plt.figure()

plt.plot( np.arange(M),10*np.log10(np.real(spectrum.eigenValue[freqIndex])),'ob-' )

plt.xlabel('Eigenvalue number')

plt.ylabel('Eigenvalue [dB]')

plt.xticks(np.arange(M))

if flagFigSave:

plt.savefig('Graph/eigenvalues.png',format='png')

plt.show()

図1は観測波形、図2はこれに対応した空間スペクトログラムです。この観測波形では、時刻0 [s]から-45°に設置した雑音源(白色雑音)が、時刻4 [s]から 0 °に設置した信号源(音声、人)が、それぞれ鳴り始めます。図3は図2の青点線の時刻における平均空間スペクトルです。この図から、上述の音源の位置が推定されているのがわかります。図4は、図3の平均処理を行う前の、周波数ごとの空間スペクトルです。図5は1000Hzにおける空間スペクトルです。これらの図から、それぞれの音源は、周波数ごとに見ると、スパースであることがわかります。このため、このサンプルプログラムでは、音源数 prm.NumberOfSource = 1 としています。図6は、MUSIC法において算出する固有値分布を示しています(1000Hz)。

図1:観測波形

図2:空間スペクトログラム

図3:平均空間スペクトル

図4(a):空間-周波数スペクトル

図4(b):(a)の3次元表示

図5:単一周波数(1000Hz)の空間スペクトル

図6:単一周波数(1000Hz)の固有値分布

参考文献

浅野 太著、“音のアレイ信号処理”、コロナ社 (2011)