去年5月證交所更換網頁, 舊版程式 已經無法使用, 因此改寫部分程式碼, 要注意的是上市部分現在改用西元年, 但是如果當日(例如星期日)沒有資料, 會傳會最近的資料, 因此存檔時務必注意日期是否正確, 不可在無開市日抓資料, 上櫃則不受影響, 無開市日期會傳回空資料, 模組gstock.py與主程式改版如下:
gstock.py:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import csv import datetime as dt import pandas as pd import httplib2 from urllib.parse import urlencode import pymysql as mysql
#common variables def twdate(date): year = date.year-1911 month = date.month day = date.day twday = '{}/{:02}/{:02}'.format(year,month,day) return twday
def addate(date): year = date.year month = date.month day = date.day twday = '{}/{:02}/{:02}'.format(year,month,day) return twday # -------------------------------- # TWSE & OTC download functions # --------------------------------
def downloadTWSE(date): if (date<dt.date(2007, 7, 2)): #no data return []
url="http://www.twse.com.tw/exchangeReport/MI_INDEX" values = {'response' : 'csv', 'date' : addate(date), 'type' : 'ALL' } agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0' #httplib2.debuglevel = 1 conn = httplib2.Http('.cache') headers = {'Content-type': 'application/x-www-form-urlencoded', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': agent} resp, content = conn.request(url, 'POST', urlencode(values), headers) respStr = str(content.decode('ansi')); srcTWSE = list(csv.reader(respStr.split('\n'), delimiter=',')) if (len(srcTWSE)>0): print(srcTWSE[0]) #search stock list firstIndex=0 lastIndex=0 for i in range(len(srcTWSE)): row = srcTWSE[i] if (len(row)>15): #16 columns row[0]=row[0].strip(' =\"') row[1]=row[1].strip(' =\"') if (row[0]=='1101'): #1st stock ID firstIndex=i elif (row[0]=='9958'): #lastest stock ID lastIndex=i+1 break #print('TWSE index=',firstIndex,lastIndex) listTWSE = srcTWSE[firstIndex:lastIndex] #print(listTWSE[0]) resultTWSE = [row[:2]+row[5:9]+row[2:3] for row in listTWSE] return resultTWSE
def downloadOTC(date): if (date<dt.date(2007, 7, 2)): #no data return [] url='http://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430_print.php?l=zh-tw&d='+twdate(date)+'&se=EW' table = pd.read_html(url)[0] rowCount = table.values.shape[0]-1; srcOTC = table.values[:rowCount].tolist() #search stock list firstIndex=0 lastIndex=0 for i in range(len(srcOTC)): row = srcOTC[i] if (row[0]=='1258'): #1st stock ID firstIndex=i elif (row[0]=='9962'): #lastest stock ID lastIndex=i+1 break #print('OTC index=',firstIndex,lastIndex) listOTC = srcOTC[firstIndex:lastIndex] resultOTC = [row[:2]+row[4:7]+row[2:3]+row[7:8] for row in listOTC] #print(resultOTC[0]) return resultOTC
def showStock(stockID, stockName, Open, High, Low, Close,Volume): showLen=8 #print('\nTWSE count=',len(stockID)) print('ID:',stockID[:showLen]) print('Name:',stockName[:showLen]) print('Open:',Open[:showLen]) print('High:',High[:showLen]) print('Low:',Low[:showLen]) print('Close:',Close[:showLen]) print('Volume:',Volume[:showLen])
# ----------------- # financeDB class # ----------------- class financeDB:
def __init__(self): # common variables self.user = 'root' self.host = 'localhost' self.passwd = '' self.dbName = 'finance' self.stockDataTableName = "StockData" # stock data table name self.companyProfileTableName = "CompanyProfile"
# --------------------- # MySQL functions # --------------------- def connectMySQL(self): # 連接到 MySQL self.db = mysql.connect(host=self.host, user=self.user, passwd=self.passwd, charset='utf8') self.cursor = self.db.cursor() self.cursor.execute("SET NAMES 'UTF8'") self.cursor.execute("SET CHARACTER_SET_RESULTS=UTF8")
def connectDB(self): # 連接到 MySQL self.db = mysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.dbName, charset='utf8') self.cursor = self.db.cursor() self.cursor.execute("SET NAMES 'UTF8'") self.cursor.execute("SET CHARACTER_SET_RESULTS=UTF8")
def runQuery(self, sql): #print(sql) self.cursor.execute(sql) self.results = self.cursor.fetchall()
def createStockDatabase(self): strSQL = "CREATE DATABASE IF NOT EXISTS `"+self.dbName+"` CHARACTER SET utf8 COLLATE utf8_general_ci;" self.connectMySQL() self.runQuery(strSQL) self.db.close()
def createStockDataTable(self): strSQL = "CREATE TABLE IF NOT EXISTS `" \ + self.stockDataTableName + "` (" \ + "`Date` DATE NOT NULL ," \ + "`StockID` CHAR(32) NOT NULL ," \ + "`Open` FLOAT NOT NULL ,"\ + "`High` FLOAT NOT NULL ,"\ + "`Low` FLOAT NOT NULL ,"\ + "`Close` FLOAT NOT NULL ,"\ + "`Volume` FLOAT NOT NULL,"\ + "INDEX (Date, StockID) " \ + ") ENGINE = MYISAM ;" self.connectDB() self.runQuery(strSQL) self.db.close()
def saveStockData(self, date, stockID, Open, High, Low, Close, Volume): #showStock(stockID, "?", Open, High, Low, Close, Volume) year = date.year month = date.month day = date.day strDate = '{}/{:02}/{:02}'.format(year,month,day)
self.connectDB() #self.createStockDataTable() for i in range(0, len(stockID)-1): if (Open[i].find('--')>=0): #bad value continue strSQL = "INSERT INTO " + self.stockDataTableName \ + " (`Date`, `StockID`, `Open`, `High`, `Low`, `Close`, `Volume`) " \ + " VALUES (" \ + "'"+strDate + "'," \ + "'"+stockID[i] + "'," \ + Open[i] + "," \ + High[i] + "," \ + Low[i] + "," \ + Close[i] + "," \ + Volume[i] + ");" self.runQuery(strSQL) self.db.close()
def createStockProfileDB(self): # 連接到 MySQL self.connectDB()
strSQL = "CREATE TABLE IF NOT EXISTS `" \ + self.companyProfileTableName + "` (" \ + "`StockID` DATE NOT NULL ," \ + "`CreateDate` DATE NOT NULL ," \ + "`OnBoardDate` DATE NOT NULL ," \ + "`Open` FLOAT NOT NULL ,"\ + "`High` FLOAT NOT NULL ,"\ + "`Low` FLOAT NOT NULL ,"\ + "`Close` FLOAT NOT NULL ,"\ + "`Volume` FLOAT NOT NULL,"\ + "INDEX (Date, StockID) " \ + ") ENGINE = MYISAM ;"
self.runQuery(self, strSQL) self.db.close()
|
主程式 :
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import gstock from gstock import financeDB import datetime as dt from datetime import timedelta import numpy as np import numpy.core.defchararray as npstr import sys
#main
# connect DB try: print("Connect DB") db = financeDB() #print("Create DB") #db.createStockDatabase() #print("Create Table") #db.createStockDataTable() print("DB Connected") except: print('MySQL DB Error') exit()
# download date downloadDate= dt.date.today() #- timedelta(days=5) print("Download date : ", downloadDate)
#--------------- # download TWSE #--------------- TWSE_list = gstock.downloadTWSE(downloadDate) #get result TWSE_result = np.array(TWSE_list) if (len(TWSE_result)>0): TWSE_stockID=TWSE_result[:,0] TWSE_stockName=TWSE_result[:,1] TWSE_Open=npstr.replace(TWSE_result[:,2],',','') TWSE_High=npstr.replace(TWSE_result[:,3],',','') TWSE_Low=npstr.replace(TWSE_result[:,4],',','') TWSE_Close=npstr.replace(TWSE_result[:,5],',','') TWSE_Volume=npstr.replace(TWSE_result[:,6],',','') print('TWSE count=',len(TWSE_stockID)) #showStock(stockID, stockName, Open, High, Low, Close,Volume)
#save TWSE To DB try: print("Save TWSE") db.saveStockData(downloadDate, TWSE_stockID, TWSE_Open, TWSE_High, TWSE_Low, TWSE_Close, TWSE_Volume) print("Save Complete") except: print('MySQL DB Error') exit() else: print("TWSE no data")
#--------------- #download OTC #--------------- OTC_list=gstock.downloadOTC(downloadDate) #get result OTC_result = np.array(OTC_list) if (len(OTC_result)>0): OTC_stockID=OTC_result[:,0] OTC_stocCName=OTC_result[:,1] OTC_Open=npstr.replace(OTC_result[:,2],',','') OTC_High=npstr.replace(OTC_result[:,3],',','') OTC_Low=npstr.replace(OTC_result[:,4],',','') OTC_Close=npstr.replace(OTC_result[:,5],',','') OTC_Volume=npstr.replace(OTC_result[:,6],',','') print('OTC count=',len(OTC_stockID)) #showStock(stockID, stockName, Open, High, Low, Close,Volume)
#save OTC To DB try: print("Save OTC") #db.saveStockData(downloadDate, OTC_stockID, OTC_Open, OTC_High, OTC_Low, OTC_Close, OTC_Volume) print("Save Complete") except: print('MySQL DB Error') exit() else: print("OTC no data") |
留言列表