Here returns to the code page.
Chown and Müller (2018) consider a weighted empirical process of nonparametric regression residuals as the basis of a test for heteroscedasticity in the nonparametric regression model.
R code:
################################################################################
## Proctest simulation with one predictor variable using quadratic weights. ##
################################################################################
## Required libraries
require(tcltk)
require(parallel)
## Set the correct working directory.
setwd("~/Documents/Published/proctest/simulation/")
## Custom functions
## The regression function.
reg = function(x) {
return( 2 * x + 3 * cos( pi * x ) )
}
## A simple quadratic scale function.
scl1 = function(x) {
return( 0.4 + 4 * x ^ 2 )
}
## A fan-type scale function.
scl2 = function(x) {
return( 2 * exp(x) - 1 / 2 )
}
## A simple local alternative scale function.
scl3 = function(x) {
return( 1 + 15 * x ^ 2 / sqrt( length(x) ) )
}
## A simple quadratic detection function.
detect = function(x) {
return( x ^ 2 )
}
## Computes test statistic for Chown and Mueller (2018)
CM_test = function(w, es) {
## Order weights according to residuals.
w = w[ order(es) ]
## Return largest normalized cumulative sum of weights.
return( max( abs( cumsum(w) ) ) / sqrt( length(es) ) )
}
## Computes the cosine kernel function.
w_cos = function(u) {
return( pi * cos( pi * u / 2 ) * ( abs(u) <= 1 ) / 4 )
}
## Computes the psi vector for the local linear polynomial.
psi = function(u) {
return( cbind(1, u) )
}
## Computes the local linear estimate at x.
P1_fit_x = function(x, xdata, ydata, bwd) {
## Sample size.
nsamp = length(ydata)
## Localized covariates.
U = ( x - xdata ) / bwd
## Diagonal weight matrix.
W = diag( w_cos(U) )
## n by 2 matrix of polnomial terms for each normalized covariate.
Psi = psi(U)
## 2 by 2 Gram matrix for weighted P1 basis.
G = t(Psi) %*% W %*% Psi
## Weighted inner products between Y and local linear polynomial terms.
v = t(Psi) %*% W %*% matrix(ydata, nrow=nsamp)
## Apply regularization only to small samples.
if( nsamp <= 150 ) { G = G + diag(1e-5, 2) }
## Local polynomial coefficients.
betahat = qr.solve(G, v)
## Return the first component.
return( betahat[1] )
}
## Wrapper function to compute local linear estimates with many covariates.
P1_fit = function(xs, xdata, ydata, bwd) {
return( sapply(xs, P1_fit_x, xdata, ydata, bwd) )
}
## A function to compute a bootstrap version of the test statistic.
boot_TS = function(b, es, fits, xdata, bwd) {
## Sample size.
nsamp = length(es)
## Bootstrap indices.
boot_ind = sample(1 : nsamp, replace=TRUE)
## Bootstrap errors.
es_boot = es[boot_ind] + 1.06 * sd(es) * nsamp ^ ( -1 / 5 ) * rnorm(nsamp)
## Bootstrap covariates.
xdata_boot = xdata[boot_ind]
## Bootstrap responses.
ydata_boot = fits[boot_ind] + es_boot
## Bootstrap fitted values.
fits_boot = P1_fit(xdata_boot, xdata_boot, ydata_boot, bwd)
## Bootstrap residuals.
res_boot = ydata_boot - fits_boot
## Bootstrap detection function.
dtf_boot = detect(xdata_boot)
## Bootstrap weights.
w_boot = ( dtf_boot - mean(dtf_boot) ) / sd(dtf_boot)
## Return the bootstrap test statistic.
return( CM_test(w_boot, res_boot) )
}
## Computes a suitable quantile for the test statistic using bootstrap.
boot_quantile = function(xdata, ydata, bwd) {
## Calculate fitted values.
fits = P1_fit(xdata, xdata, ydata, bwd)
## Calculate residuals.
res = ydata - fits
## Second conditional moment estimates.
m2_fits = P1_fit(xdata, xdata, ydata ^ 2, bwd)
## Variance estimate.
var_fits = abs( m2_fits - fits ^ 2 )
## Calculate the error scale estimate.
sighat = sqrt( mean(var_fits) )
## Form residuals conforming to H0.
res_H0 = ( res - mean(res) ) * sighat / sqrt(var_fits)
## Estimate distribution of test statistics.
teststats = parSapply(compcl, 1 : 200, boot_TS, res_H0, fits, xdata, bwd)
## Return 95 percentile of test statistics calculated under H0.
return( sort(teststats)[191] )
}
## Simulation.
## Number of simulation runs.
runs = 1000
## Sample sizes.
samples = c(50, 100, 200, 1000)
## Test decisions.
decision_null = rep(NA, runs)
decision_alt1 = rep(NA, runs)
decision_alt2 = rep(NA, runs)
decision_alt3 = rep(NA, runs)
## Simulated powers.
power_null = rep(NA, length(samples))
power_alt1 = rep(NA, length(samples))
power_alt2 = rep(NA, length(samples))
power_alt3 = rep(NA, length(samples))
## Open a progress bar.
pb = tkProgressBar(title="Proctest simulation", min=0, max=runs)
## Auto-detect the number of CPU cores.
ncores = detectCores()
## Initiate cluster on this machine.
compcl = makeCluster(ncores, type="FORK")
## Simulation
for(n in samples) {
## Fill progress bar with current information.
setTkProgressBar(pb, 0, label=paste0("Sample size: ", n))
## Set initial time difference to zero.
avg_time_diff = 0
## Get index for n in samples.
n_ind = which( samples == n )
for(s in 1 : runs) {
## Start a computing timer.
init_time = proc.time()[3]
## Covariates.
X = runif(n, -1, 1)
## Responses generated through model equation.
Y_null = reg(X) + rnorm(n)
Y_alt1 = reg(X) + scl1(X) * rnorm(n)
Y_alt2 = reg(X) + scl2(X) * rnorm(n)
Y_alt3 = reg(X) + scl3(X) * rnorm(n)
## A suitable bandwidth for local linear smoother.
bwd_reg = 1.25 * ( n * log(n) ) ^ ( -1 / 4 )
## Fitted values.
fits_null = P1_fit(X, X, Y_null, bwd_reg)
fits_alt1 = P1_fit(X, X, Y_alt1, bwd_reg)
fits_alt2 = P1_fit(X, X, Y_alt2, bwd_reg)
fits_alt3 = P1_fit(X, X, Y_alt3, bwd_reg)
## Residuals.
res_null = Y_null - fits_null
res_alt1 = Y_alt1 - fits_alt1
res_alt2 = Y_alt2 - fits_alt2
res_alt3 = Y_alt3 - fits_alt3
## Detection function.
dtf = detect(X)
## Detection weights.
W = ( dtf - mean(dtf) ) / sd(dtf)
## Calculate test statistics.
TS_null = CM_test(W, res_null)
TS_alt1 = CM_test(W, res_alt1)
TS_alt2 = CM_test(W, res_alt2)
TS_alt3 = CM_test(W, res_alt3)
## Find critical value for 5% level test by bootstrap.
crit_val_null = boot_quantile(X, Y_null, bwd_reg)
crit_val_alt1 = boot_quantile(X, Y_alt1, bwd_reg)
crit_val_alt2 = boot_quantile(X, Y_alt2, bwd_reg)
crit_val_alt3 = boot_quantile(X, Y_alt3, bwd_reg)
## Test decisions.
decision_null[s] = 1 * ( TS_null > crit_val_null )
decision_alt1[s] = 1 * ( TS_alt1 > crit_val_alt1 )
decision_alt2[s] = 1 * ( TS_alt2 > crit_val_alt2 )
decision_alt3[s] = 1 * ( TS_alt3 > crit_val_alt3 )
## One step corrected mean estimate of computing time.
avg_time_diff = ( ( s - 1 ) * avg_time_diff +
proc.time()[3] - init_time ) / s
## Estimated time to finish (in hours).
ETF = round(avg_time_diff * ( runs - s ) / 3600, digits=1)
## Updated progress bar information.
pb_msg = paste0("Sample size: ", n, " (", ETF, " hour(s) to go)")
## Update progress bar.
setTkProgressBar(pb, s, label=pb_msg)
}
## Calculate simulated power.
power_null[n_ind] = mean(decision_null)
power_alt1[n_ind] = mean(decision_alt1)
power_alt2[n_ind] = mean(decision_alt2)
power_alt3[n_ind] = mean(decision_alt3)
}
## Close the progress bar.
close(pb)
## Stop the computing cluster and free up system resources.
stopCluster(compcl)
## Simulation results.
## Collect results.
results = rbind(samples, power_null, power_alt1, power_alt2, power_alt3)
## Display results.
round(results, digits=3)
## Save results to a file.
save(results, file="proctest_results.RData")
Python code:
###############################################################################
## Proctest simulation with one predictor variable using quadratic weights. ##
###############################################################################
## Required libraries.
import numpy as np
import scipy as sp
from scipy.stats import uniform
from scipy.stats import norm
from progress.bar import Bar
from tabulate import tabulate
## Progress bar class.
class FancyBar(Bar):
suffix='%(percent)d%% - %(ETF).1fh to go.'
@property
def ETF(self):
return self.eta / 3600.0
## Custom functions.
## The regression function.
def reg(x):
return( 2.0 * x + 3.0 * np.cos( np.pi * x ) )
## A simple quadratic scale function.
def scl1(x):
return( 0.4 + 4 * x ** 2 )
## A fan-type scale function.
def scl2(x):
return( 2.0 * np.exp(x) - 1.0 / 2.0 )
## A simple local alternative scale function.
def scl3(x):
return( 1.0 + 15.0 * x ** 2 / np.sqrt(x.size) )
## A simple quadratic detection function.
def detect(x):
return( x ** 2 )
## Computes test statistic from Chown and Mueller (2018).
def CM_test(w, es):
## Order weights according to residuals.
w = w[ np.argsort(es) ]
## Return the largest normalized cumulative sum of weights.
return( np.max( np.abs( np.cumsum(w) ) ) / np.sqrt(es.size) )
## The cosine kernel function.
def w_cos(u):
return( np.pi * np.cos( np.pi * u / 2.0 ) * ( np.abs(u) <= 1.0 ) / 4.0 )
## Computes the matrix of local linear terms.
def psi(u):
return( np.column_stack((np.ones(u.size), u)) )
## Computes the local linear estimate at x.
def P1_fit_x(x, xdata, ydata, bwd):
## Sample size.
nsamp = ydata.size
## Transform xdata to localized covariates.
U = ( x - xdata ) / bwd
## Diagonal weight matrix.
W = np.diag( w_cos(U) )
## n-by-2 matrix of polynomial terms.
Psi = psi(U)
## 2-by-2 Gram matrix for weighted P1 basis.
G = np.matmul(np.matmul(Psi.T, W), Psi)
## Inner products between ydata and P1.
v = np.matmul(np.matmul(Psi.T, W), ydata.reshape( (nsamp, 1) ))
## Apply regularization to small samples.
if nsamp <= 150:
G = G + np.diag( np.ones(2) / 1.0e5 )
## Local polynomial coefficients.
betahat = sp.linalg.solve(G, v, assume_a='pos')
## Return the first component.
return( betahat[0][0] )
## Wrapper function to compute local linear estimates with many covariates.
def P1_fit(xs, xdata, ydata, bwd):
## Estimates.
est = np.empty(xs.size)
## Loop computing predicted values over each element of xs.
for i in range(xs.size):
est[i] = P1_fit_x(xs[i], xdata, ydata, bwd)
## Return the predicted values.
return(est)
## A function to compute a bootstrap version of the test statistic.
def boot_TS(es, fits, xdata, bwd):
## Sample size.
nsamp = es.size
## Bootstrap indices.
boot_ind = np.random.choice(nsamp, nsamp)
## Bootstrap errors.
es_boot = ( es[boot_ind] + 1.06 * np.std(es) * nsamp ** ( -1.0 / 5.0 )
* norm.rvs(size=nsamp) )
## Bootstrap covariates.
xdata_boot = xdata[boot_ind]
## Bootstrap responses.
ydata_boot = fits[boot_ind] + es_boot
## Bootstrap fitted values.
fits_boot = P1_fit(xdata_boot, xdata_boot, ydata_boot, bwd)
## Bootstrap residuals.
res_boot = ydata_boot - fits_boot
## Bootstrap detection function.
dtf_boot = detect(xdata_boot)
## Bootstrap weights.
w_boot = ( dtf_boot - np.mean(dtf_boot) ) / np.std(dtf_boot)
## Return the bootstrap test statistic.
return( CM_test(w_boot, res_boot) )
## Computes a suitable quantile for the test statistic using bootstrap.
def boot_quantile(xdata, ydata, bwd):
## Calculate fitted values.
fits = P1_fit(xdata, xdata, ydata, bwd)
## Calculate residuals.
res = ydata - fits
## Second conditional moment estimates.
m2_fits = P1_fit(xdata, xdata, ydata ** 2.0, bwd)
## Variance estimates.
var_fits = np.abs( m2_fits - fits ** 2.0 )
## Estimated error scale.
sighat = np.sqrt( np.mean(var_fits) )
## Form residuals conforming to H0.
res_H0 = ( res - np.mean(res) ) * sighat / np.sqrt(var_fits)
## 200 test statistcs.
test_stats = np.empty(200)
## Calculate 200 bootstrap test statistic values.
for i in range(200):
test_stats[i] = boot_TS(res_H0, fits, xdata, bwd)
## Return the 95th percentile of test statistics calculated under H0.
return( np.sort(test_stats)[191] )
## Simulation.
## Number of simulation runs.
runs = 1000
## Sample sizes.
samples = np.array( [50, 100, 200, 1000] )
## Test decisions.
decision_null = np.empty(runs)
decision_alt1 = np.empty(runs)
decision_alt2 = np.empty(runs)
decision_alt3 = np.empty(runs)
## Simulated powers.
power_null = np.empty(samples.size)
power_alt1 = np.empty(samples.size)
power_alt2 = np.empty(samples.size)
power_alt3 = np.empty(samples.size)
## Simulation.
for n in samples:
## Get the index for n in samples.
n_ind = np.where( samples == n )[0][0]
## Open a progress bar.
bar = FancyBar(message='Sample size: ' + str(n), max=runs)
for s in range(runs):
## Generate covariates.
X = 2.0 * ( uniform.rvs(size=n) - 1.0 / 2.0 )
## Resposes via model equation.
Y_null = reg(X) + norm.rvs(size=n)
Y_alt1 = reg(X) + scl1(X) * norm.rvs(size=n)
Y_alt2 = reg(X) + scl2(X) * norm.rvs(size=n)
Y_alt3 = reg(X) + scl3(X) * norm.rvs(size=n)
## A suitable bandwidth for the local linear smoother.
bwd_reg = 1.25 * ( n * np.log(n) ) ** ( -1.0 / 4.0 )
## Fitted values.
fits_null = P1_fit(X, X, Y_null, bwd_reg)
fits_alt1 = P1_fit(X, X, Y_alt1, bwd_reg)
fits_alt2 = P1_fit(X, X, Y_alt2, bwd_reg)
fits_alt3 = P1_fit(X, X, Y_alt3, bwd_reg)
## Residuals.
res_null = Y_null - fits_null
res_alt1 = Y_alt1 - fits_alt1
res_alt2 = Y_alt2 - fits_alt2
res_alt3 = Y_alt3 - fits_alt3
## Detection function.
dtf = detect(X)
## Detection weights.
W = ( dtf - np.mean(dtf) ) / np.std(dtf)
## Test statistics.
TS_null = CM_test(W, res_null)
TS_alt1 = CM_test(W, res_alt1)
TS_alt2 = CM_test(W, res_alt2)
TS_alt3 = CM_test(W, res_alt3)
## Find critical value for 5% level test by bootstrap.
crit_val_null = boot_quantile(X, Y_null, bwd_reg)
crit_val_alt1 = boot_quantile(X, Y_alt1, bwd_reg)
crit_val_alt2 = boot_quantile(X, Y_alt2, bwd_reg)
crit_val_alt3 = boot_quantile(X, Y_alt3, bwd_reg)
## Test decisions.
decision_null[s] = 1 * ( TS_null > crit_val_null )
decision_alt1[s] = 1 * ( TS_alt1 > crit_val_alt1 )
decision_alt2[s] = 1 * ( TS_alt2 > crit_val_alt2 )
decision_alt3[s] = 1 * ( TS_alt3 > crit_val_alt3 )
bar.next()
## Calculate simulated power.
power_null[n_ind] = np.mean(decision_null)
power_alt1[n_ind] = np.mean(decision_alt1)
power_alt2[n_ind] = np.mean(decision_alt2)
power_alt3[n_ind] = np.mean(decision_alt3)
bar.finish()
## Simulation results.
## Collect results.
results = np.vstack(
(samples, power_null, power_alt1, power_alt2, power_alt3 ) )
## Display simulation results.
print( tabulate(results) )