統計分析(一)

資料源

  • 範例資料
    • 讀取 CSV 檔案,運用 NumPy 計算平均值與標準差。
    • 調用 Python 元件繪圖範例
    • 調用 R 元件繪圖範例

程式碼

# -*- coding: utf-8 -*- import sys import csv from datetime import datetime import numpy as np import matplotlib.pyplot as plt from matplotlib import rcParams from scipy import stats from Stat5 import Stat class SP500: plotStockData = descriptiveStat = None nameOfFields = [u"日期", u"開盤", u"收盤", u"最高", u"最低", u"成交量", u"盤後"] def __init__(self): self.descriptiveStat = Stat() def loadFile(self, csvFile, fieldHead=True): stockData = [] csvF = open(csvFile, 'r') for rowDB in csv.DictReader(csvF, self.nameOfFields): if fieldHead: fieldHead = False else: if (rowDB is not None): stockDate = rowDB[self.nameOfFields[0]] stockData.append( [ datetime.strptime(stockDate, '%Y-%m-%d'), float(rowDB[self.nameOfFields[1]]), float(rowDB[self.nameOfFields[2]]), float(rowDB[self.nameOfFields[3]]), float(rowDB[self.nameOfFields[4]]), float(rowDB[self.nameOfFields[5]]), float(rowDB[self.nameOfFields[6]]) ]) self.plotStockData = np.array(stockData) def printData(self, fieldHead=True, fromRecNum=0, toRecNum=100): if fieldHead: for field in self.nameOfFields: print "%s\t" % (field), print for stockInfo in self.plotStockData[fromRecNum:toRecNum]: print "%s\t" % (stockInfo[0].date()), for i in range(1, len(stockInfo)): print "%.2f" % stockInfo[i], print def getFieldData(self, cat): return self.plotStockData[:, cat] def getFieldStat(self, cat): ret = None if cat > 0: self.descriptiveStat.Reset() ret = self.descriptiveStat.getStat(self.getFieldData(cat)) return ret def PrintStat(self, cat): if cat > 0: statInfo = self.getFieldStat(cat) nameOfCat = [u'總和', u'平均數', u'變異數', u'標準差', u'最大', u'最小'] print self.nameOfFields[cat] for i in range(0, len(nameOfCat)): print "\t%s=%.2f" % (nameOfCat[i], statInfo[i]) def getFieldNames(self): return self.nameOfFields def PlotLines(self, fromRecNum=0, toRecNum=100): rcParams['font.family'] = 'NanumBarunGothic' plt.title(u"S&P500 指數線圖") plt.xlabel(u"日期") plt.ylabel(u"指數") for line in [1, 2, 3, 4, 6]: X = range(0, self.plotStockData[:, 0].size) Y = self.plotStockData[:, line] plt.plot(X[fromRecNum:toRecNum], Y[fromRecNum:toRecNum], label=self.nameOfFields[line]) plt.legend(loc='lower left') plt.gca().invert_xaxis() plt.show() def Normal_Test(self, cat, fromRecNum=0, toRecNum=100): z_value = p_value = 0 if cat > 0: (z_value, p_value) = stats.normaltest( self.plotStockData[fromRecNum:toRecNum, cat]) return (z_value, p_value) def Chi_Square_Test(self, cat, fromRecNum=0, toRecNum=100): chi_square = p_value = 0 if cat > 0: (chi_square, p_value) = stats.chisquare( self.plotStockData[fromRecNum:toRecNum, cat]) return (chi_square, p_value) def ANOVA(self, fromRecNum=0, toRecNum=100, isNorm=True): ret = [0, 0] if isNorm: ret = stats.f_oneway( self.plotStockData[fromRecNum:toRecNum, 1], self.plotStockData[fromRecNum:toRecNum, 2], self.plotStockData[fromRecNum:toRecNum, 6]) else: ret = stats.kruskal( self.plotStockData[fromRecNum:toRecNum, 1], self.plotStockData[fromRecNum:toRecNum, 2], self.plotStockData[fromRecNum:toRecNum, 6]) return ret if __name__ == "__main__": csvFile = 'SP500.csv' if len(sys.argv) > 1: csvFile = sys.argv[1] taskControl = [True, False, False, False, False, True, True, True] plotSP500 = SP500() plotSP500.loadFile(csvFile) if taskControl[0]: plotSP500.printData() if taskControl[1]: for i in range(1, 7): print plotSP500.getFieldData(i) if taskControl[2]: for i in range(1, 7): print plotSP500.getFieldStat(i) if taskControl[3]: for i in range(1, 7): plotSP500.PrintStat(i) if taskControl[4]: plotSP500.PlotLines() if taskControl[5]: for i in [1, 2, 6]: (z_value, p_value) = plotSP500.Normal_Test(i) print "%s\tZ-value=%.4f\tP-value=%.4f" % \ (plotSP500.getFieldNames()[i], z_value, p_value) print if taskControl[6]: for i in [1, 2, 6]: (chi_square_value, p_value) = plotSP500.Chi_Square_Test(i) print "%s\tChi-value=%.4f\tP-value=%.4f" % \ (plotSP500.getFieldNames()[i], chi_square_value, p_value) print if taskControl[7]: (f_value, p_value) = plotSP500.ANOVA(0, 100, True) print "Normal\tP-value=%.4f\t" % (p_value)

程式庫

# -*- coding: utf-8 -*- import numpy as np class Stat: # Sum Mean Var StdDev Max Min descStat = [None, None, None, None, None, None] def __init__(self): pass def Sum(self, x): X = np.array(x) self.descStat[4] = X.max() self.descStat[5] = X.min() ret = self.descStat[0] = X.sum() return ret def Mean(self, x): X = np.array(x) ret = 0.0 if len(x) > 0: if self.descStat[0] is None: ret = self.Sum(x) else: ret = self.descStat[0] ret = X.mean() else: pass self.descStat[1] = ret return ret def Var(self, x): X = np.array(x) ret = 0.0 if len(x) > 1: if self.descStat[1] is None: self.Mean(x) # 變異數 ret = X.var() else: pass self.descStat[2] = ret return ret def StdDev(self, x): X = np.array(x) if len(x) > 1: if self.descStat[2] is None: self.Var(x) ret = X.std() self.descStat[3] = ret return ret def getStat(self, x): ret = None if len(x) > 1: if self.descStat[3] is None: self.StdDev(x) ret = self.descStat else: pass return ret def PrintStat(self, header): nameOfCat = [u'總和', u'平均數', u'變異數', u'標準差', u'最大', u'最小'] print header for cat in range(0, len(header)): print "\t%s=%.2f" % (nameOfCat[cat], self.descStat[cat]) def Reset(self): for i in range(0, len(self.descStat)): self.descStat[i] = None