Prefect

https://docs.prefect.io/latest/tutorial/first-steps/ may need to docker Docker Desktop of using.

test.py:

from prefect import flow


@flow(name="Test flow", description="An example flow", version="0.0.1", tags=["tutorial", "tag2"])

def my_favorite_function():

    print("What is your favorite number?")

    return 42


print(my_favorite_function())

python test.py

Use tasks for functions

test.py:

import requests

from prefect import flow, task


@task(name="Call API", description="GET request")

def call_api(url):

    response = requests.get(url)

    print(response.status_code)

    return response.json()


@task(name="Parse fact", description("get fact from response", retries=2, retry_delay_seconds=60)

def parse_fact(response):

    fact = response["fact"]

    print(fact)

    return fact


@flow(name="API flow", desciption="Get cat fact")

def api_flow(url):

    fact_json = call_api(url)

    fact_text = parse_fact(fact_json)

    return fact_text


api_flow("https://catfact.ninja/fact")

python test.py

Using common flows

test.py:

from prefect import flow


@flow

def common_flow(config: dict):

    print("I am a subgraph that shows up in lots of places!")

    intermediate_result = 42

    return intermediate_result


@flow

def main_flow():

    # do some things

    # then call another flow function

    data = common_flow(config={})

    # do more things


main_flow()

using retries

test.py

from prefect import flow, task



# this tasks runs 3 times before the flow fails

@task(

    retries=2,

    retry_delay_seconds=5,

    name="Test task",

    description="An example task",

    tags=["tutorial", "tag2"],

)

def failure():

    print("running")

    raise ValueError("bad code")



@flow(

    name="Test flow",

    description="An example flow",

    version="0.0.1",

)

def test_retries():

    return failure()



test_retries()

Caching

test.py:

from prefect import flow, task

from prefect.tasks import task_input_hash

from datetime import timedelta


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))

def hello_task(name_input):

    # Doing some work

    print(f"Saying hello {name_input}")

    return "hello " + name_input


@flow

def hello_flow(name_input):

    hello_task(name_input)


hello_task("a")

hello_task("a")

hello_task("a")

python test.py

Task runners

test.py:

from prefect import flow, task

from prefect.task_runners import SequentialTaskRunner


@task

def first_task(num):

    return num + num


@task

def second_task(num):

    return num * num


@flow(name="My Example Flow",

      task_runner=SequentialTaskRunner(),

)

def my_flow(num):

    plusnum = first_task.submit(num)

    sqnum = second_task.submit(plusnum)

    print(f"add: {plusnum.result()}, square: {sqnum.result()}")


my_flow(5)

Using Dask with prefect

https://prefecthq.github.io/prefect-dask/ 

https://docs.prefect.io/latest/guides/dask-ray-task-runners/ 

test.py

from typing import List

from pathlib import Path


import httpx

from prefect import flow, task

from prefect_dask import DaskTaskRunner


URL_FORMAT = (

    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"

    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"

)


@task

def download_image(year: int, month: int, directory: Path) -> Path:

    # download image from URL

    url = URL_FORMAT.format(year=year, month=month)

    resp = httpx.get(url)


    # save content to directory/YYYYMM.png

    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")

    file_path.write_bytes(resp.content)

    return file_path


@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))

def download_nino_34_plumes_from_year(year: int) -> List[Path]:

    # create a directory to hold images

    directory = Path("data")

    directory.mkdir(exist_ok=True)


    # download all images

    file_paths = []

    for month in range(1, 12 + 1):

        file_path = download_image.submit(year, month, directory)

        file_paths.append(file_path)

    return file_paths


if __name__ == "__main__":

    download_nino_34_plumes_from_year(2022)


prefect server

prefect server start

and go to http://127.0.0.1:4200/