2020/12/23
いつものようにyoutubeを見ていたところ、とあるエンジニアさんのチャンネルでウェブスクレイピングというものを使って、全自動で
領収書をダウンロードし、スプレッドシートに記入しているのを見て、
「なんじゃこりゃ、スゲー」
と思いググったらseleniumというものにヒットした。
seleniumを使えば、インターネットの処理なんてあっという間に自動化できるとのことだったので、
2,3日ほど色々調べながらプログラムを作った。
今回作ったプログラムは、ネットの棋譜データベースに接続して対局のリンクを取得してくるものと
そのリンクを周回してkifファイルをダウンロードするというもの。
この際、「三間飛車」の戦型に限定してurlを周回、ダウンロードさせたのだが、
urlからすべて仕分けることができなかったので、クロール側で仕分けることにした。
しかし、クロールプログラムの1つ1つの動作が遅い(サイト表示までのラグが原因)ので
並行処理せざるを得なくなった。
これは詰んだかと思ったが、Qiitaのソースコードを丸パクしたらすんなりいった。
(今まで並行処理に苦しんできた時間はいったい…)
クラス側の方に主要な処理を描いて、並行処理で実行する関数はclass外に配置しました。
たぶん誤解かもしれませんがself.~~という関数をexecutorの引数には送れないみたいなことが書いてある記事を見つけたので
一応class外に書きました。実際class外の方が書きやすいのもありましたが。
Gitにも上げました
https://github.com/narikyow/Kifu_Crawl
以下ソースコードです
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
from concurrent import futures
class Crawler:
def __init__(self,address,bt_form):
self.address=address
self.kif_url_list=[]
self.output_kif=""
self.battle_form=bt_form
self.counter=0
self.options = Options()
self.options.add_argument("--headless")#これないとPCが落ちるかも
self.options.binary_location = "chromeのパス"
self.driver = webdriver.Chrome(chrome_options=self.options, executable_path="ドライバーのパス")
self.read()
def read(self):
with open(self.address,mode="r") as f:
self.kif_url_list=list(f)
self.counter=len(self.kif_url_list)
#urlをlistで格納
def get_kif(self,num):
i = num
if "https:" in self.kif_url_list[i].split("/"):
self.driver.get(self.kif_url_list[i])
time.sleep(2)
battle_form_elem_text=self.driver.find_element_by_xpath("/html/body/div[@class='container-fluid']/div[@id='app']/div/div[@class='row'][2]/div[@class='col-12 col-md-3 py-2'][1]/div/table[@class='table table-bordered table-hover table-sm']/tbody/tr[10]/td/a").text
teaiwari=self.driver.find_element_by_xpath("/html/body/div[@class='container-fluid']/div[@id='app']/div/div[@class='row'][2]/div[@class='col-12 col-md-3 py-2'][1]/div/table[@class='table table-bordered table-hover table-sm']/tbody/tr[6]/td").text
if self.battle_form in battle_form_elem_text and "平手" in teaiwari:
self.driver.find_element_by_id("kif-export").click()
time.sleep(1)
kif_elem=self.driver.find_element_by_class_name("control-group")
kif_text=kif_elem.find_element_by_tag_name("textarea").text
with open("".join(["Kif_DB_Files/",str(i+1),".kif"]),mode="w")as f:
f.write(kif_text)
print(kif_text)
print("".join([str(i+1)," / ",str(self.counter)," Finished"]))
self.driver.close()
def multi_Main(count,address,battle_form):
future_list=[]
with futures.ThreadPoolExecutor(max_workers=8) as executor:
#for i in range(count):
#send_list=[i,address,battle_form]
future=[executor.submit(multi_sub,[i,address,battle_form]) for i in range(count)]
future_list.append(future)
_=future.as_completed(fs=future_list)
def multi_sub(args):
Cr = Crawler(args[1],args[2])
Cr.get_kif(args[0])
if __name__ == "__main__":
address = "DataBaseLinks.txt"
#urlを格納したファイル名
battle_form="三間飛車"
#戦型名
Cr = Crawler(address,battle_form)
multi_Main(Cr.counter,address,battle_form)
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
class Browse:
def __init__(self,count):
self.options = Options()
self.options.binary_location = "chromeファイルのパス"
self.driver = webdriver.Chrome(chrome_options=self.options, executable_path="chromeドライバのパス")
self.text=""
self.next_url="None"
self.count=count
self.driver.get('https://shogidb2.com/search?q=%E4%B8%89%E9%96%93%E9%A3%9B%E8%BB%8A')
def get_url(self):
element = self.driver.find_element_by_class_name('list-group')
aTag = element.find_elements_by_tag_name("a")
print(aTag)
url_list= [i.get_attribute("href") for i in aTag]
print(url_list)
for i in range(len(url_list)):
#if "三間飛車" in battleform_list[i]:
self.text=("".join([self.text,url_list[i],"\n"]))
def change_page(self):
confirm=0
confirm_text=0
self.next_url="None"#初期化
next_element = self.driver.find_element_by_class_name('pagination')
next_aTag = next_element.find_elements_by_tag_name("a")
next_text = [i.text for i in next_aTag]
self.count=str(int(self.count)+1)
for i in range(len(next_text)):
if self.count == next_text[i]:
self.next_url= next_aTag[i].get_attribute("href")
confirm +=1
if int(self.count) >400:
confirm=0
break
if confirm == 0:
confirm_text=1
return confirm_text
self.driver.get(self.next_url)
time.sleep(3)
return confirm_text
def auto(self):
confirm_return=0
while confirm_return==0:
self.get_url()
print(self.text)
confirm_return = self.change_page()
with open("DataBaseLinks.txt",mode="w") as f:
f.write(self.text)
print_text="all done"
return print_text
if __name__=="__main__":
count_num="1"
browse=Browse(count_num)
time.sleep(3)
do = browse.auto()
print(do)
暇なときとか、やる気がでたら解説みたいなの作ろうと思います。