Python module/script is executed (interface option -m) with:
$ python3 mymodule.py
$ python3 -m mymodule
$ chmod +x mymodule.py
$ ./mymodule.py
The source file in case need to be executed NOT as python3 command line argument, should be like this (see /usr/local/bin/pip3). The first shebang is NOT needed in case of execution as python3 mymodule.py.
#!/usr/local/opt/python@3.9/bin/python3.9
# -*- coding: utf-8 -*-
import re
import sys
from pip._internal.cli.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
And, pydoc is also good module! Try this:
python3 -m pydoc ./mymodule_or_script.py
python3 -m pydoc -w ./mymodule_or_script.py
Install pip3 using pip3:
$ /testingvenv/bin/python3.9 -m pip install pip==21.1.3
$ python3.9 -m pip install --upgrade typesystem==0.2.5
Inside Dockerfile:
RUN apt update \
&& DEBIAN_FRONTEND=noninteractive apt install -y --no-install-recommends \
software-properties-common \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt update \
&& DEBIAN_FRONTEND=noninteractive apt install -y --no-install-recommends \
python3.9-venv \
&& DEBIAN_FRONTEND=noninteractive apt install -y --no-install-recommends \
python3-pip build-essential \
&& apt clean \
&& rm -rf /var/lib/apt/lists/*
RUN python3.9 -m venv /venv
ENV PATH=/venv/bin:$PATH
RUN python3.9 -m pip install "pip==21.1.3"
The output of below 2 commands is the same:
$ pip3 list
$ python3 -m pip list
$ pip3 list > pupu
$ python3 -m pip list > pupu2
$ diff <(pip3 list) <(python3 -m pip list)
Install (inside a venv) a specific version of pip:
$ /testingvenv/bin/python3.9 -m pip install pip==21.1.3
RUN python3.9 -m pip install "pip==21.1.3" # in Dockerfile
To create requirements.txt in order to regenerate python environment:
$ pip3 freeze > requirements.txt
$ diff <(pip3 freeze) requirements.txt
Install autopep8 (to be called from py-autopep8 emacs lisp):
$ pip3 install autopep8
The above will install autopep8==1.6.0 and pycodestyle==2.8.0, but no need to add this 2 modules to requirements.txt.
The standard/default one is venv. Nvidia NGC is using virtualenv. What about pipenv? It seems pipenv is OK in case for creating environment for application (NOT library).
For now, use venv, because easy to use and already installed by default (>=3.3):
Pipenv をやめて venv を使いだした話 (uedder)
Create venv: $ python3 -m venv ~/pyvenvs/myproj
Activate: $ source ~/pyvenvs/myproj/bin/activate
Install package etc: $ pip list; pip install numpy==1.21.0
Create requirements.txt in order to duplicate virtual environment: $ pip freeze > requirements.txt
Deactivate: $ deactivate
Regenerate environment:
$ python -m venv ~/pyvenvs/herproject
$ source ~/pyvenvs/herproject/bin/activate; echo $PATH; printenv PATH
$ ~/pyvenvs/herproject/bin/python3.9 -m pip install pip==21.1.3
$ python -m pip install -r requirements.txt
pyenv is for using a lot of python versions!
Emit cd and activate automatically? See uedder's page.
$ python3 -m pip install pytest OR pip3 install pytest
Implementation to be tested: myapi.py:
import responder
api = responder.API()
@api.route("/")
def hello_world(req, resp):
resp.text = "hello, world!"
@api.route("/{greeting}")
async def greet_world(req, resp, greeting):
resp.text = f"{greeting}, world!"
if __name__ == "__main__":
api.run()
Tester program test_myapi.py:
import pytest
import time
import myapi as service
@pytest.fixture(scope="module", autouse=True)
def testapi():
time.sleep(1)
print('\nfixture_setup')
# return service.api
yield service.api
time.sleep(1)
print('\nfixture_teardown')
def test_hello_world(testapi):
r = testapi.requests.get("/")
assert r.text == "hello, world!"
@pytest.mark.parametrize(
"test_input, expected_output", [
("testA", "testA, world!"),
("testB", "testB, world!"),
]
)
def test_greeting(testapi, test_input, expected_output):
r = testapi.requests.get(f"/{test_input}")
assert r.text == expected_output
Execute with:
$ pytest --durations=0 -vv -s --disable-warnings -k test_greeting # -s will print the 'print()', -k test_greeting will test test_greeting only
For example, in the import-ed module, "at just before the code begins" in the imported library, write:
logger = getLogger('aaa.bbb')
Then, in the import-ing main/application, "at just before the code begins" in the application python source, write:
logger = getLogger('aaa')
By this, every logger setting, for example handler added to the logger ('aaa') will be propagated to the logger ('aaa.bbb') in the import-ed library also. Please note also that loggers which are returned by the getLogger('name') referenced with the same 'name', are IDENTICAL.
See: https://qiita.com/kitsuyui/items/5a7484a09eeacb564649
See: https://zenn.dev/tkm/articles/python-logging-basic
It seems that using the root logger (=getLogger()) only, will NOT change the loglevel of the logger inside the library. (The problem is, logger configuration MUST BE DONE globally, NOT inside a function). So, use below code:
import asyncio
import ffmpeg
import logging
import time
from asynccpu import ProcessTaskPoolExecutor
from asyncffmpeg import FFmpegCoroutineFactory, StreamSpec
# log_format = '%(asctime)s:%(name)s:%(levelname)s:'\
# '%(filename)s:%(lineno)s:%(message)s'
log_format = '%(asctime)s:%(name)s:%(levelname)s:'\
'%(pathname)s:%(funcName)s:%(lineno)s:%(message)s'
# Set the root logger configuration with logging.basicConfig
logging.basicConfig(filename='./logs/asyncffmpeg.log',
encoding='utf-8',
filemode='a',
datefmt='%Y/%m/%d %H:%M:%S',
format=log_format,
level=logging.DEBUG)
# logger = logging.getLogger() # logger is the root logger
# logger.setLevel(logging.DEBUG)
# However, just use the logging.debug, NOT logger.debug,
# because logging.debug is already logging to root logger.
async def create_stream_spec_conv() -> StreamSpec:
logging.error('create_stream_spec_conv')
stream = ffmpeg.input("ghgh.mov")
return ffmpeg.output(stream, "aaa.mp4").overwrite_output()
async def main() -> None:
import sys
ffmpeg_coroutine = FFmpegCoroutineFactory.create()
with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
awaitables = [
executor.create_process_task(ffmpeg_coroutine.execute,
create_stream_spec_conv),
]
await asyncio.gather(*awaitables)
logging.info('reach here1') # log info to root logger
if __name__ == "__main__":
logging.info('Starting test!!!!') # log info to root logger
asyncio.run(main())
logging.info('reach here4') # log info to root logger
import asyncio
import ffmpeg
import logging
import time
from asynccpu import ProcessTaskPoolExecutor
from asyncffmpeg import FFmpegCoroutineFactory, StreamSpec
LOGLEVEL = logging.DEBUG
logger = logging.getLogger() # logger is the root logger
logger.setLevel(LOGLEVEL)
log_format = logging.Formatter(
# fmt='%(asctime)s:%(name)s:%(levelname)s:%(message)s',
# fmt='%(asctime)s:%(name)s:%(levelname)s:'\
# '%(filename)s:%(lineno)s:%(message)s',
fmt='%(asctime)s:%(name)s:%(levelname)s:'\
'%(pathname)s:%(funcName)s:%(lineno)s:%(message)s',
datefmt='%Y/%m/%d %H:%M:%S'
)
sh = logging.StreamHandler() # stderr
sh.setLevel(LOGLEVEL)
sh.setFormatter(log_format)
logger.addHandler(sh)
fh = logging.FileHandler('./logs/asyncffmpeg.log',
mode='a', encoding='utf-8')
fh.setLevel(LOGLEVEL)
fh.setFormatter(log_format)
logger.addHandler(fh)
async def create_stream_spec_conv() -> StreamSpec:
# logger configuration MUST BE put before any defs.
logger.error('create_stream_spec_conv')
stream = ffmpeg.input("ghgh.mov")
return ffmpeg.output(stream, "aaa.mp4").overwrite_output()
async def main() -> None:
# logger configuration MUST BE put before any defs.
ffmpeg_coroutine = FFmpegCoroutineFactory.create()
with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
awaitables = [
executor.create_process_task(ffmpeg_coroutine.execute,
create_stream_spec_conv),
]
await asyncio.gather(*awaitables)
logger.info('reach here1') # log info to root logger
if __name__ == "__main__":
# logger configuration MUST BE put before any defs.
logger.info('Starting test!!!!')
asyncio.run(main())
logger.info('reach here4')
Use asyncio get_event_loop + run_in_executor for blocking requests process, semaphore etc.
(get_nowait) a-non-blocking-read-on-a-subprocess-pipe-in-python
asyncio.create_subprocess_shell: executing subprocess asynchronously (1)
asyncio.create_subprocess_shell: executing subprocess asynchronously (2)
Coroutine is a special function which can suspend its execution while waiting for something.
async def coro(...):
print('a')
r = await func('b')
print(r)
Coroutine is defined as above using keyword async def.
When the above coro has execution control, it will BLOCK, executes print('a'). But when it arrives at await, while waiting for the result of func('b'), it release the execution control. So, it behaves as a NON-BLOCKING coroutine at await.
At the next execution control, and after the result of func('b') arrived, it will BLOCK again and executes print(r).
So, if there is NO await keyword in the async defined coroutine, it will BLOCK during its execution, just like ordinary function.
Load from txt file of 5 columns. Get the 2nd and 3rd column. Then plot.
import numpy as np
xy10 = np.loadtxt("report10pct.txt", delimiter="\t", usecols=[1,3])
xy20 = np.loadtxt("report20pct.txt", delimiter="\t", usecols=[1,3])
xy10.shape
xy20.shape
xy = np.concatenate((xy10, xy20), axis=0)
xy.shape
xy = np.delete(xy, np.where(xy[:,0]>1000), axis=0)
xy.shape
xy = np.delete(xy, np.where(xy[:,1]<0), axis=0)
xy.shape
import matplotlib.pyplot as pp
pp.scatter(xy[:,0], xy[:,1], s=1)
pp.show()
Use the above data.
xdata = xy[:,0]
ydata = xy[:,1]
c, residuals, rank, singular_values, rcond = np.polyfit(xdata, ydata, 1, full=True) # c[0]*x + c[1]
p1 = np.poly1d(c) # polynomial with the coefficient c
c2 = np.polyfit(xdata, ydata, 2) # c2[0]*x**2 + c2[1]*x + c2[2]
p2 = np.poly1d(c2) # polynomial with the coefficient c2
x = np.linspace(min(xdata), max(xdata), 1000)
pp.plot(xdata, ydata, '.', x, p1(x), '-', x, p2(x), '-')
pp.savefig('figure.png')
pp.show()
OR, plot like below:
import matplotlib.pyplot as pp
fig, ax = pp.subplots()
ax.set_title('input-length vs response-time')
ax.set_xlabel('input-length (x)')
ax.set_ylabel('response-time in sec')
ax.set_aspect('equal')
ax.scatter(xdata, ydata, s=1, c='C0', label='data')
ax.plot(x, p1(x), c='C1', label=str(p1))
ax.plot(x, p2(x), c='C2', label=str(p2))
ax.grid()
ax.legend()
fig.savefig('figure.png')
# -*- coding: utf-8 -*-
"""
Stress testing DNN TTS
PYTHONASYNCIODEBUG=1 python3 ttsstress.py
Usage from external network (ttsserver-myco.py requires --proxyurl):
0. Start ttsserver-myco service:
`python3 ttsserver-myco.py --proxyurl http://localhost:23128`
1. `source ~/pyvenvs/movcrt/bin/activate`
2. Start movie_creator service:
`python3 movie_creator.py --ctts http://localhost:12345`
3. Shutdown with C-c then `deactivate`
To show this docstring/help:
python3 -m pydoc ./ttsstress.py
"""
import asyncio
import concurrent.futures
import logging
import time
import requests
import responder
logging.basicConfig(level=logging.DEBUG)
api = responder.API()
# (1) https://qiita.com/ragnar1904/items/85b81febefd3f3f2899a
BASE_URL = "https://jsonplaceholder.typicode.com/"
def calc_time(fn):
"""Decorator for timing fn execution time"""
def wrapper(*args, **kwargs):
start = time.perf_counter()
res = fn(*args, **kwargs)
end = time.perf_counter()
print(f"[{fn.__name__}{args}] elapsed time: {end - start}")
return res
return wrapper
def get_sync(path: str) -> dict:
print(f"/{path} request")
res = requests.get(BASE_URL + path)
print(f"/{path} request done")
return res.json()
@calc_time
def main_sync():
data_ls = []
paths = [
"posts",
"comments",
"albums",
"photos",
"todos",
"users",
]
for path in paths:
data_ls.append(get_sync(path))
return data_ls
async def get_async(path: str) -> dict:
print(f"/{path} async request")
url = BASE_URL + path
loop = asyncio.get_event_loop()
# イベントループで実行
res = await loop.run_in_executor(None, requests.get, url)
print(f"/{path} async request done")
return res.json()
@calc_time
def main_async():
# イベントループを取得
loop = asyncio.get_event_loop()
# 非同期実行タスクを一つのFutureオブジェクトに
tasks = asyncio.gather(
get_async("posts"),
get_async("comments"),
get_async("albums"),
get_async("photos"),
get_async("todos"),
get_async("users"),
)
# 非同期実行、それぞれが終わるまで
results = loop.run_until_complete(tasks)
return results
# (2) https://pod.hatenablog.com/entry/2018/09/22/215030
def req(i):
"""NON-coroutine, ordinary function to simulate blocking function"""
print(i, "start")
time.sleep(0.5) # BLOCKING function which simulates requests.get()
# await asyncio.sleep(0.5) # coroutine, NON-blocking asyncio.sleep()
print(i, "end")
return i
async def run_sync():
async def run_req(i):
return req(i)
tasks = [run_req(i) for i in range(3)]
res = await asyncio.gather(*tasks)
return res
@calc_time
def run_sync2():
async def run_req(i):
return req(i)
tasks = [run_req(i) for i in range(3)]
gathered = asyncio.gather(*tasks)
results = asyncio.get_event_loop().run_until_complete(gathered)
return results
@calc_time
def run_async():
async def run_req(loop, i): # This is the coroutine
res = await loop.run_in_executor(None, req, i) # None: thread
return res
loop = asyncio.get_event_loop()
tasks = [run_req(loop, i) for i in range(3)]
gathered = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered)
return results
@calc_time
def run_semaphore():
sem = asyncio.Semaphore(5) # if set to 1, then same as blocking!
# limit simultaneous request to 5.
async def run_req(loop, i):
async with sem:
res = await loop.run_in_executor(None, req, i)
return res
loop = asyncio.get_event_loop()
tasks = [run_req(loop, i) for i in range(20)]
gathered = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered)
return results
def req_excp(i):
print(i, "start")
time.sleep(0.5)
if i == 3:
raise Exception("hmm")
print(i, "end")
return i
@calc_time
def run_excp():
async def run_req(loop, i):
res = await loop.run_in_executor(None, req_excp, i)
return res
loop = asyncio.get_event_loop()
tasks = [run_req(loop, i) for i in range(12)]
# gathered = asyncio.gather(*tasks) # Stop at exception in coroutine.
gathered = asyncio.gather(*tasks, return_exceptions=True)
results = loop.run_until_complete(gathered)
return results
# (3) https://qiita.com/everylittle/items/57da997d9e0507050085
async def func1():
print('func1() started')
await asyncio.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
# await asyncio.sleep(2)
await asyncio.sleep(2)
print('func2() finished')
async def func2b():
print('func2b() started')
await asyncio.sleep(1)
print('func2b() finished')
@api.background.task
def process_data():
"""Just sleeps for three seconds, as a demo."""
print('process_data started')
time.sleep(5)
print('process_data finished')
async def main_create_task():
print('main_create_task started')
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task2
print("task2 is finished!!!")
await task1
process_data()
task2b = asyncio.create_task(func2b())
await task2b
# Below will be printed before BEFORE background process process_data()
# prints 'process_data finished' and finished.
print('main_create_task finished')
# await task2b
@api.background.task
def process_data2():
time.sleep(3)
print('process_data2 finished')
return 2
async def main_back():
future2 = process_data2()
print('before future2.result()')
task1 = asyncio.create_task(func1())
print('task1')
await task1
# In case that future2.result() is needed, then PAUSE in future2.result()!
print(future2.result())
# Below will be printed AFTER background process process_data2() finished.
print('after future2.result()')
def func3():
"""Normal function, NOT a coroutine"""
print('func3() started')
s = sum(i for i in range(10**7))
print('func3() finished')
return s
def func4():
"""Normal function, NOT a coroutine"""
print('func4() started')
s = sum(i*i for i in range(10**7))
print('func4() finished')
return s
async def main_pool():
# Using the get_running_loop() function is preferred to get_event_loop() in
# coroutines.
# Inside NON coroutine (normal function, NOT async def), use get_event_loop
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
task3 = loop.run_in_executor(pool, func3)
task4 = loop.run_in_executor(pool, func4)
# result3, result4 = await asyncio.gather(task3, task4)
# return [result3, result4]
return await asyncio.gather(task3, task4)
if __name__ == '__main__':
# print('Start stress testing')
# print('Finish stress testing')
# (1) https://qiita.com/ragnar1904/items/85b81febefd3f3f2899a
# print(main_sync())
# print(main_async())
# main_sync()
# main_async()
# (2) https://pod.hatenablog.com/entry/2018/09/22/215030
# print(asyncio.get_event_loop().run_until_complete(run_sync()))
# print(run_sync2())
# print(run_async())
# print(run_semaphore())
# print(run_excp())
# (3) https://qiita.com/everylittle/items/57da997d9e0507050085
# asyncio.run(main_create_task())
# asyncio.run(main_back())
print(asyncio.run(main_pool()))