Prefect
https://docs.prefect.io/latest/tutorial/first-steps/ may need to docker Docker Desktop of using.
test.py:
from prefect import flow
@flow(name="Test flow", description="An example flow", version="0.0.1", tags=["tutorial", "tag2"])
def my_favorite_function():
print("What is your favorite number?")
return 42
print(my_favorite_function())
python test.py
Use tasks for functions
test.py:
import requests
from prefect import flow, task
@task(name="Call API", description="GET request")
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task(name="Parse fact", description("get fact from response", retries=2, retry_delay_seconds=60)
def parse_fact(response):
fact = response["fact"]
print(fact)
return fact
@flow(name="API flow", desciption="Get cat fact")
def api_flow(url):
fact_json = call_api(url)
fact_text = parse_fact(fact_json)
return fact_text
api_flow("https://catfact.ninja/fact")
python test.py
Using common flows
test.py:
from prefect import flow
@flow
def common_flow(config: dict):
print("I am a subgraph that shows up in lots of places!")
intermediate_result = 42
return intermediate_result
@flow
def main_flow():
# do some things
# then call another flow function
data = common_flow(config={})
# do more things
main_flow()
using retries
test.py
from prefect import flow, task
# this tasks runs 3 times before the flow fails
@task(
retries=2,
retry_delay_seconds=5,
name="Test task",
description="An example task",
tags=["tutorial", "tag2"],
)
def failure():
print("running")
raise ValueError("bad code")
@flow(
name="Test flow",
description="An example flow",
version="0.0.1",
)
def test_retries():
return failure()
test_retries()
Caching
test.py:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def hello_task(name_input):
# Doing some work
print(f"Saying hello {name_input}")
return "hello " + name_input
@flow
def hello_flow(name_input):
hello_task(name_input)
hello_task("a")
hello_task("a")
hello_task("a")
python test.py
Task runners
test.py:
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@task
def first_task(num):
return num + num
@task
def second_task(num):
return num * num
@flow(name="My Example Flow",
task_runner=SequentialTaskRunner(),
)
def my_flow(num):
plusnum = first_task.submit(num)
sqnum = second_task.submit(plusnum)
print(f"add: {plusnum.result()}, square: {sqnum.result()}")
my_flow(5)
Using Dask with prefect
https://prefecthq.github.io/prefect-dask/
https://docs.prefect.io/latest/guides/dask-ray-task-runners/
test.py
from typing import List
from pathlib import Path
import httpx
from prefect import flow, task
from prefect_dask import DaskTaskRunner
URL_FORMAT = (
"https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)
@task
def download_image(year: int, month: int, directory: Path) -> Path:
# download image from URL
url = URL_FORMAT.format(year=year, month=month)
resp = httpx.get(url)
# save content to directory/YYYYMM.png
file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
file_path.write_bytes(resp.content)
return file_path
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
# create a directory to hold images
directory = Path("data")
directory.mkdir(exist_ok=True)
# download all images
file_paths = []
for month in range(1, 12 + 1):
file_path = download_image.submit(year, month, directory)
file_paths.append(file_path)
return file_paths
if __name__ == "__main__":
download_nino_34_plumes_from_year(2022)
prefect server
prefect server start
and go to http://127.0.0.1:4200/