Cassandra 與 CSV (運用 List)

建立表格

  1. CREATE TABLE stock_hist(stock_date TEXT, stock_id TEXT, stock_quotes LIST,
    PRIMARY KEY(stock_date, stock_id));
  2. INSERT INTO stock_hist(stock_date, stock_id, stock_quotes)
    VALUES ('2017-01-08', 'IBM', [50.0, 50.0, 55.0, 55.0]);
  3. SELECT stock_quotes FROM stock_hist WHERE stock_date='2017-01-08' AND stock_id='IBM';
     stock_quotes
    ------------------
     [50, 50, 55, 55]
    
  4. DELETE FROM stock_hist WHERE stock_date='2017-01-08' AND stock_id='IBM';

程式碼

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-

from cassandra.cluster import Cluster
import csv
import math
import numpy
import matplotlib.pyplot as plt
import matplotlib.font_manager as fontManager


class LoadRunner:
    session = None

    def __init__(self):
        pass

    def connectDB(self, nodes):
        try:
            cluster = Cluster(nodes)
            self.session = cluster.connect()
        except (RuntimeError, TypeError, NameError):
            pass

    def closeDB(self):
        if self.session is None:
            return

        try:
            self.session.cluster.shutdown()
        except (RuntimeError, TypeError, NameError):
            pass

    def runSQL(self, cql):
        if self.session is None:
            return

        try:
            self.session.execute(cql)
        except (RuntimeError, TypeError, NameError):
            print(cql)

    def runQuery(self, cql):
        if self.session is None:
            return

        resultSetDB = None

        try:
            resultSetDB = self.session.execute(cql)
        except (RuntimeError, TypeError, NameError):
            pass

        return resultSetDB

    def loadDataFromCSV(self, csvFile, stockID='IBM', showDetail=True):
        if self.session is None:
            return

        self.runSQL('TRUNCATE emprogria.stock_hist')
        with open(csvFile, 'r') as csvF:
    #       Date, Open, High, Low, Close, Volume, Adj Close

            fieldHead = True
            for rowDB in csv.DictReader(csvF, ["日期", "開盤", "最高", "最低", "收盤", "成交量", "盤後"]):
                if fieldHead:
                    fieldHead = False
                else:
                    if (rowDB is not None):
                        cql = "INSERT INTO emprogria.stock_hist "          \
                                "(stock_date, stock_id, stock_quotes) "    \
                                "VALUES('%s', '%s', [%.2f, %.2f, %.2f, %.2f, %.2f])" %       \
                                (rowDB['日期'], stockID, 
                                float(rowDB['開盤']), float(rowDB['收盤']), 
                                float(rowDB['最高']), float(rowDB['最低']), 
                                math.log(float(rowDB['成交量'])))

                        self.runSQL(cql)

                        if showDetail:
                            print(u"日期:%s\t%s\t盤後:%.2f" % (
                                float(rowDB['日期']), stockID, float(rowDB['盤後'])))

    def plotLine(self, stockID, X, Y):
        prop = fontManager.FontProperties(fname='/System/Library/Fonts/PingFang.ttc')
        plt.rcParams["font.family"] = prop.get_name()

        f, axarr = plt.subplots(3)

        axarr[0].set_title(u"%s 收盤指數\n差率 線圖" % stockID, fontproperties=prop)
        axarr[1].set_xlabel(u"日期序", fontproperties=prop)
        axarr[0].set_ylabel(u"收盤指數", fontproperties=prop)
        axarr[0].plot(X, Y, 'r', label=u'差率')
 
        sp = numpy.fft.fft(Y)
        freq = numpy.fft.fftfreq(Y.shape[-1])

        # 實部
        axarr[1].set_ylabel(u"收盤 實部指數", fontproperties=prop)
        axarr[1].get_xaxis().set_visible(False)

        sp.real[sp.real > 10] = 0
        axarr[1].plot(freq, sp.real, 'b', label=u'實部')

        # 虛部
        axarr[2].set_title(u"傅立葉 線圖", fontproperties=prop)
        axarr[2].set_xlabel(u"頻率", fontproperties=prop)
        axarr[2].set_ylabel(u"收盤 虛部指數", fontproperties=prop)
        axarr[2].plot(freq, sp.imag, 'g', label=u'虛部')

 
#       plt.show()
        plt.savefig('%s.png' % stockID)
        plt.close()


if __name__ == "__main__":
    loadRunner = LoadRunner()

    stockID = 'IBM'
    csvFile = '%s.csv' % stockID

    loadData, showDetail = (False, False)

    loadRunner.connectDB(['127.0.0.1'])

    if loadData:
       loadRunner.loadDataFromCSV(csvFile, stockID, showDetail)

    _stockDate = []
    _stockClose = []

    resultSetDB = loadRunner.runQuery("SELECT stock_date, stock_quotes " 
            "FROM emprogria.stock_hist WHERE stock_id='%s' ALLOW FILTERING" % stockID)

    if resultSetDB is not None:
        _resultSetDB = sorted(resultSetDB)

        for stockHist in _resultSetDB:
            stockQuotes = stockHist[1]

            _stockDate.append(stockHist[0])
            _stockClose.append(stockQuotes[1])

            if showDetail:
                print('%s' % stockHist[0], end='\t')

                for stockQuote in stockQuotes:
                    print('%.2f' % stockQuote, end='\t')

                print()

    loadRunner.closeDB()

    stockClose = numpy.array(_stockClose)
    stockClose_1 = numpy.diff(stockClose)

    stockMagnitude = stockClose_1.max() - stockClose_1.min()
    stockClose_2 = (stockClose_1 - stockClose_1.min()) / stockMagnitude
    _stockDate_2 = _stockDate[1:]

    loadRunner.plotLine(stockID, range(0, len(_stockDate_2)), stockClose_2)