Cassandra 與 CSV (運用 Map)

資料表格回首頁

CREATE TABLE smarthome.stockmarket2 ( marketname text, stockdate int, price10 int, stockprice map, PRIMARY KEY ((marketname, stockdate), price10) ) WITH CLUSTERING ORDER BY (price10 ASC) AND bloom_filter_fp_chance = 0.01 AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' AND comment = '' AND compaction = {'min_threshold': '4',

'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',

'max_threshold': '32'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99.0PERCENTILE';

程式碼

# -*- coding: utf-8 -*- from datetime import datetime import csv from cassandra.cluster import Cluster import math class LoadRunner: startTime = None session = None def __init__(self): pass def start(self): self.startTime = datetime.utcnow() def stop(self): stopTime = datetime.utcnow() elapseTime = stopTime - self.startTime return elapseTime def connectDB(self, nodes): cluster = Cluster(nodes) self.session = cluster.connect() def runSQL(self, cql): self.session.execute(cql) def runQuery(self, cql): resultSetDB = self.session.execute(cql) return resultSetDB def closeDB(self): self.session.cluster.shutdown() def createDB(self): self.runSQL("DROP TABLE IF EXISTS SmartHome.StockMarket2") cql = "CREATE TABLE IF NOT EXISTS SmartHome.StockMarket2 \ (MarketName text, StockDate int, \ StockPrice map<int, float>, \ Price10 int, \ PRIMARY KEY((MarketName, StockDate), \ Price10))" self.runSQL(cql) def loadDataFromCSV(self, csvFile, showDetail): self.runSQL('TRUNCATE SmartHome.StockMarket2') csvF = open(csvFile, 'r') # Date, Open, High, Low, Close, Volume, Adj Close fieldHead = True for rowDB in csv.DictReader(csvF, ["日期", "開盤", "最高", "最低", "收盤", "成交量", "盤後"]): if fieldHead: fieldHead = False else: if (rowDB is not None): afterClosePrice = float(rowDB['盤後']) price10 = math.floor(math.log10(afterClosePrice) * 100) cql = "INSERT INTO SmartHome.StockMarket2 \ (MarketName, StockDate, \ StockPrice, Price10) \ VALUES('S&P500', %s, \ {0:%s, 1:%s, 2:%s, 3:%s, 4:%s, 5:%s}, \ %d)" % \ (rowDB['日期'], rowDB['開盤'], rowDB['最高'], rowDB['最低'], rowDB['收盤'], rowDB['成交量'], rowDB['盤後'], price10) self.runSQL(cql) if showDetail: print "%s\t%d\t%s" % \ (rowDB['日期'], price10, rowDB['盤後']) def queryStockMarket(self, sortReversed, showDetail): cql = "SELECT StockDate, Price10, StockPrice \ FROM SmartHome.StockMarket2" resultSetDB = loadRunner.runQuery(cql) if sortReversed < 0: sortedResultSetDB = sorted(resultSetDB, key=lambda tup: -tup[1]) elif sortReversed > 0: sortedResultSetDB = sorted(resultSetDB, key=lambda tup: tup[1]) else: sortedResultSetDB = resultSetDB for rowDB in sortedResultSetDB: (stockDate, price10, stockPrice) = rowDB afterClosePrice = stockPrice[5] if showDetail: print "%d\t%d\t%10.2f" % (stockDate, price10, afterClosePrice) return sortedResultSetDB def queryStockMarketAfterClosePrice(self, valveBound, sortReversed, showDetail): vBoundries = [math.floor(math.log10(valveBound[0]) * 100), math.floor(math.log10(valveBound[1]) * 100)] if vBoundries[0] == vBoundries[1]: cql = "SELECT StockDate, Price10, StockPrice \ FROM SmartHome.StockMarket2 \ WHERE Price10 = %d \ ALLOW FILTERING" % \ (vBoundries[0]) else: cql = "SELECT StockDate, Price10, StockPrice \ FROM SmartHome.StockMarket2 \ WHERE Price10 >= %d AND Price10 <= %d \ ALLOW FILTERING" % \ (vBoundries[0], vBoundries[1]) resultSetDB = loadRunner.runQuery(cql) tempResultSetDB = [] for rowDB in resultSetDB: (stockDate, price10, stockPrice) = rowDB afterClosePrice = stockPrice[5] if (afterClosePrice >= valveBound[0]) and \ (afterClosePrice <= valveBound[1]): tempResultSetDB.append((stockDate, price10, afterClosePrice)) if sortReversed < 0: sortedResultSetDB = sorted(tempResultSetDB, key=lambda tup: -tup[1]) else: sortedResultSetDB = sorted(tempResultSetDB, key=lambda tup: tup[1]) for rowDB in sortedResultSetDB: (stockDate, price10, afterClosePrice) = rowDB if showDetail: print "%d\t%d\t%10.2f" % (stockDate, price10, afterClosePrice) return sortedResultSetDB def queryStockMarketAfterClosePrice2(self, valveBound, sortReversed, showDetail): vBoundries = [math.floor(math.log10(valveBound[0]) * 100), math.floor(math.log10(valveBound[1]) * 100)] resultSetDB = [] for price10Level in range(int(vBoundries[0]), int(valveBound[1]) + 1): cql = "SELECT StockDate, Price10, StockPrice \ FROM SmartHome.StockMarket2 \ WHERE Price10 = %d \ ALLOW FILTERING" % (price10Level) tempResultSetDB = loadRunner.runQuery(cql) for rowDB in tempResultSetDB: (stockDate, price10, stockPrice) = rowDB afterClosePrice = stockPrice[5] if (afterClosePrice >= valveBound[0]) and \ (afterClosePrice <= valveBound[1]): resultSetDB.append((stockDate, price10, afterClosePrice)) if sortReversed < 0: sortedResultSetDB = sorted(resultSetDB, key=lambda tup: -tup[1]) else: sortedResultSetDB = sorted(resultSetDB, key=lambda tup: tup[1]) for rowDB in sortedResultSetDB: (stockDate, price10, afterClosePrice) = rowDB if showDetail: print "%d\t%d\t%10.2f" % (stockDate, price10, afterClosePrice) return sortedResultSetDB def doMean(self, resultSetDB): (sumF, meanF) = (0.0, 0.0) for rowDB in resultSetDB: (stockDate, price10, stockPrice) = rowDB afterClosePrice = stockPrice[5] sumF += afterClosePrice if len(resultSetDB) > 0: meanF = sumF / len(resultSetDB) return meanF def doStdDev(self, resultSetDB, meanValue): (sumF, stdF) = (0.0, 0.0) for rowDB in resultSetDB: (stockDate, price10, stockPrice) = rowDB afterClosePrice = stockPrice[5] sumF += math.pow(afterClosePrice - meanValue, 2) if len(resultSetDB) > 0: stdF = sumF / (len(resultSetDB) - 1) stdF = math.sqrt(stdF) return stdF if __name__ == "__main__": loadRunner = LoadRunner() loadRunner.connectDB(['127.0.0.1']) runTask = [0, 0, 0, 2, 0] if runTask[0] > 0: loadRunner.createDB() if runTask[1] > 0: loadRunner.start() csvFile = '/home/big-data/Cassandra-Trainning/data/SP500.csv' loadRunner.loadDataFromCSV(csvFile, runTask[1] > 1) print loadRunner.stop() if runTask[2] > 0: numOfRecords = [0.0, 0.0] loadRunner.start() resultSetDB = loadRunner.queryStockMarket(-1, False) numOfRecords[0] = len(resultSetDB) meanF = loadRunner.doMean(resultSetDB) stdF = loadRunner.doStdDev(resultSetDB, meanF) valveBound = [meanF - stdF / 2, meanF + stdF / 2] resultSetDB = loadRunner.queryStockMarketAfterClosePrice( valveBound, -1, runTask[2] > 1) numOfRecords[1] = len(resultSetDB) portionOfRecords = float(numOfRecords[1]) / float(numOfRecords[0]) print "Count=%d/%d (%.2f%%)\tMean=%.2f\tStdDev=%.2f" % \ (numOfRecords[1], numOfRecords[0], portionOfRecords * 100, meanF, stdF) print loadRunner.stop() if runTask[3] > 0: numOfRecords = [0.0, 0.0] loadRunner.start() resultSetDB = loadRunner.queryStockMarket(-1, False) numOfRecords[0] = len(resultSetDB) meanF = loadRunner.doMean(resultSetDB) stdF = loadRunner.doStdDev(resultSetDB, meanF) valveBound = [meanF - stdF / 2, meanF + stdF / 2] resultSetDB = loadRunner.queryStockMarketAfterClosePrice2( valveBound, -1, runTask[3] > 1) numOfRecords[1] = len(resultSetDB) portionOfRecords = float(numOfRecords[1]) / float(numOfRecords[0]) print "Count=%d/%d (%.2f%%)\tMean=%.2f\tStdDev=%.2f" % \ (numOfRecords[1], numOfRecords[0], portionOfRecords * 100, meanF, stdF) print loadRunner.stop() if runTask[4] > 0: loadRunner.start() resultSetDB = loadRunner.queryStockMarket(-1, runTask[4] > 1) print loadRunner.stop() loadRunner.closeDB()

程式輸出

20150206 331 2055.47 20150126 331 2057.09 20150309 331 2079.43 20150327 331 2061.02 20150325 331 2061.05 20150317 331 2074.28 20150123 331 2051.82 20150122 331 2063.15 20150211 331 2068.53 20150203 331 2050.03 20150210 331 2068.59 20150312 331 2065.95 20150402 331 2066.96 20150102 331 2058.20 20150331 331 2067.89 20150326 331 2056.15 20150108 331 2062.14 20150406 331 2080.62 20150205 331 2062.52 20150313 331 2053.40 20150306 331 2071.26 20150401 331 2059.69 Count=22/64 (34.38%) Mean=2063.94 StdDev=33.96 0:00:07.855657