並列・並行

Multi Processing(ProcessPoolExecutor)

CPU Bound

Multi Threading(ThreadPoolExecutor)

Fast I/O Bound, Limited Number of Connections

Asyncio

Slow I/O Bound, Many connections

★Asyncioの使い方

例1

import asyncio

import aiohttp

def download_many(url_list):

loop = asyncio.get_event_loop()

to_do = [download_one(url) for url in sorted(url_list)]

wait_coro = asyncio.wait(to_do)

res, _ = loop.run_until_complete(wait_coro)

loop.close()

return len(res)

async def download_one(url):

status, reason, contents = await request(url)

...

return ...

async def request(url):

async with aiohttp.ClientSession() as session:

return await fetch(session, url)

async def fetch(session, url):

async with session.get(url) as resp:

#return await resp.read()

return await response(resp)

async def response(resp):

return resp.status, resp.reason, await resp.read()

例2

tasks = []

urls = [...]

async def download_one(url):

async with aiohttp.ClientSession() as session:

async with session.get(url) as response:

# response = await response.read()

return await response.read()

def download_many():

for url in urls:

task = asyncio.ensure_future(download_one(url))

tasks.append(task)

if __name__ == '__main__':

loop = asyncio.get_event_loop()

download_many()

# loop.run_until_complete(asyncio.wait(tasks))

result = loop.run_until_complete(asyncio.gather(*tasks))

"too many file"エラー対策

例1

async def download_one(url, semaphore):

async with semaphore:

async with aiohttp.ClientSession() as session:

async with session.get(url) as response:

return await response.read()

async def download_many():

semaphore = asyncio.Semaphore(1000) # 制限した並列数1000

to_get = [download_one(url, semaphore) for url in urls]

await asyncio.wait(to_get)

if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(download_many())

loop.close()

例2

import asyncio

from aiohttp import ClientSession

async def fetch(url, session):

# can not use connection pool if the different servers

# async with ClientSession() as session:

async with session.get(url) as response:

delay = response.headers.get("DELAY")

date = response.headers.get("DATE")

print("{}:{} with delay {}".format(date, response.url, delay))

return await response.read()

async def bound_fetch(sem, url, session):

async with sem:

await fetch(url, session)

async def run(r):

urls = [...]

tasks = []

sem = asyncio.Semaphore(1000)

# use connection pool if the same server

async with ClientSession() as session:

for url in urls:

task = asyncio.ensure_future(bound_fetch(sem, url, session))

tasks.append(task)

responses = asyncio.gather(*tasks)

await responses

number = 10000

loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(number))

loop.run_until_complete(future)

# loop.close()

from multiprocessing import Process

jobs = []

for row in rows:

p = Process(target=<メソッド名>, args=(row,)) # ','が必要(tuple)

jobs.append(p)

p.start()

# 全てのjobが終わるまで

[job.join() for job in jobs]

★MultiProcessの例

import multiprocessing

from functools import partial

from QuoraTextPreprocessing import preprocess


BUCKET_SIZE = 50000


def run_process(df, start):

df = df[start:start+BUCKET_SIZE]

print(start, "to ", start+BUCKET_SIZE)

temp = df["question"].apply(preprocess)


chunks = [x for x in range(0,df.shape[0], BUCKET_SIZE)]

pool = multiprocessing.Pool()

func = partial(run_process, df)

temp = pool.map(func,chunks)

pool.close()

pool.join()

★MultiThreadの例

import requests

from concurrent import futures

def download_many(url_list):

# ProcessPoolExecutor

with futures.ThreadPoolExecutor(max_workers=10) as executor:

res = executor.map(download_one, sorted(url_list))

return len(list(res))

def download_one(url):

resp = requests.get(url)

return resp.status, resp.reason, resp.content

★CPUの数

from multiprocessing import Pool

import multiprocessing as multi

p = Pool(multi.cpu_count())

p.map(process, list(range(1000)))

p.close()

★aiohttp + aiofiles

http://aiohttp.readthedocs.io/en/stable/

https://github.com/Tinche/aiofiles

例1

async def download(imageurl, title):

headers = {...}

try:

async with aiohttp.ClientSession() as session:

async with session.get(imageurl, verify_ssl=False, headers=headers) as response:

if not os.path.exists(title):

os.mkdir(title)

filename = os.path.basename(imageurl)

async with aiofiles.open(os.path.join(title, filename), 'wb') as fd:

while True:

chunk = await response.content.read(1024)

if not chunk:

break

await fd.write(chunk)

except Exception as e:

traceback.print_exc()

例2

async def download(symbol):

payload = {...}

async with aiohttp.ClientSession() as session:

async with session.get('https://example.com/query?', params=payload) as response:

data = await response.text()

async with aiofiles.open(f'./data/{symbol}.json', 'w') as outfile:

await outfile.write(data)

例3

import aiofiles

import aiohttp

import asyncio

import async_timeout

import os

async def download_coroutine(session, url):

with async_timeout.timeout(10):

async with session.get(url) as response:

filename = os.path.basename(url)

async with aiofiles.open(filename, 'wb') as fd:

while True:

chunk = await response.content.read(1024)

if not chunk:

break

await fd.write(chunk)

return await response.release()

async def main(loop):

urls = [...]

async with aiohttp.ClientSession(loop=loop) as session:

for url in urls:

await download_coroutine(session, url)

if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main(loop))