並列・並行
Multi Processing(ProcessPoolExecutor)
CPU Bound
Multi Threading(ThreadPoolExecutor)
Fast I/O Bound, Limited Number of Connections
Asyncio
Slow I/O Bound, Many connections
★Asyncioの使い方
例1
import asyncio
import aiohttp
def download_many(url_list):
loop = asyncio.get_event_loop()
to_do = [download_one(url) for url in sorted(url_list)]
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)
async def download_one(url):
status, reason, contents = await request(url)
...
return ...
async def request(url):
async with aiohttp.ClientSession() as session:
return await fetch(session, url)
async def fetch(session, url):
async with session.get(url) as resp:
#return await resp.read()
return await response(resp)
async def response(resp):
return resp.status, resp.reason, await resp.read()
例2
tasks = []
urls = [...]
async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# response = await response.read()
return await response.read()
def download_many():
for url in urls:
task = asyncio.ensure_future(download_one(url))
tasks.append(task)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
download_many()
# loop.run_until_complete(asyncio.wait(tasks))
result = loop.run_until_complete(asyncio.gather(*tasks))
"too many file"エラー対策
例1
async def download_one(url, semaphore):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
async def download_many():
semaphore = asyncio.Semaphore(1000) # 制限した並列数1000
to_get = [download_one(url, semaphore) for url in urls]
await asyncio.wait(to_get)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(download_many())
loop.close()
例2
import asyncio
from aiohttp import ClientSession
async def fetch(url, session):
# can not use connection pool if the different servers
# async with ClientSession() as session:
async with session.get(url) as response:
delay = response.headers.get("DELAY")
date = response.headers.get("DATE")
print("{}:{} with delay {}".format(date, response.url, delay))
return await response.read()
async def bound_fetch(sem, url, session):
async with sem:
await fetch(url, session)
async def run(r):
urls = [...]
tasks = []
sem = asyncio.Semaphore(1000)
# use connection pool if the same server
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(bound_fetch(sem, url, session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
number = 10000
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
# loop.close()
from multiprocessing import Process
jobs = []
for row in rows:
p = Process(target=<メソッド名>, args=(row,)) # ','が必要(tuple)
jobs.append(p)
p.start()
# 全てのjobが終わるまで
[job.join() for job in jobs]
★MultiProcessの例
import multiprocessing
from functools import partial
from QuoraTextPreprocessing import preprocess
BUCKET_SIZE = 50000
def run_process(df, start):
df = df[start:start+BUCKET_SIZE]
print(start, "to ", start+BUCKET_SIZE)
temp = df["question"].apply(preprocess)
chunks = [x for x in range(0,df.shape[0], BUCKET_SIZE)]
pool = multiprocessing.Pool()
func = partial(run_process, df)
temp = pool.map(func,chunks)
pool.close()
pool.join()
★MultiThreadの例
import requests
from concurrent import futures
def download_many(url_list):
# ProcessPoolExecutor
with futures.ThreadPoolExecutor(max_workers=10) as executor:
res = executor.map(download_one, sorted(url_list))
return len(list(res))
def download_one(url):
resp = requests.get(url)
return resp.status, resp.reason, resp.content
★CPUの数
from multiprocessing import Pool
import multiprocessing as multi
p = Pool(multi.cpu_count())
p.map(process, list(range(1000)))
p.close()
★aiohttp + aiofiles
http://aiohttp.readthedocs.io/en/stable/
https://github.com/Tinche/aiofiles
例1
async def download(imageurl, title):
headers = {...}
try:
async with aiohttp.ClientSession() as session:
async with session.get(imageurl, verify_ssl=False, headers=headers) as response:
if not os.path.exists(title):
os.mkdir(title)
filename = os.path.basename(imageurl)
async with aiofiles.open(os.path.join(title, filename), 'wb') as fd:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
await fd.write(chunk)
except Exception as e:
traceback.print_exc()
例2
async def download(symbol):
payload = {...}
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com/query?', params=payload) as response:
data = await response.text()
async with aiofiles.open(f'./data/{symbol}.json', 'w') as outfile:
await outfile.write(data)
例3
import aiofiles
import aiohttp
import asyncio
import async_timeout
import os
async def download_coroutine(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
filename = os.path.basename(url)
async with aiofiles.open(filename, 'wb') as fd:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
await fd.write(chunk)
return await response.release()
async def main(loop):
urls = [...]
async with aiohttp.ClientSession(loop=loop) as session:
for url in urls:
await download_coroutine(session, url)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))