Metaflow

https://outerbounds.com/sandbox/ 

https://docs.metaflow.org/getting-started/tutorials

https://github.com/Netflix/metaflow/blob/master/metaflow/tutorials/00-helloworld/helloworld.py 

Hello world

file.py:

from metaflow import FlowSpec, step, conda_base, current


@project(name="hello_work")

@conda_base(libraries={"pandas": "2.0.3"}, python="3.10.11")

class HelloFlow(FlowSpec):


    @step

    def start(self):

        print("HelloFlow is starting with following MetaFlow parameters:")

        print(f"username: {current.username}")

        print("namespace: {current.namespace}")

        print("flow name: {current.flow_name}")

        print("run id: {current.run_id}")

        print("origin run id: {current.origin_run_id}")

        print("step name: {current.step_name}")

        print("task id: {current.task_id}")

        print("pathspec: {current.pathspec}")

        print("flow parameters: {current.parameter_names}")

        print(f"pathspec: {current.pathspec}")

        self.next(self.hello)


    @step

    def hello(self):

        print("Metaflow says: Hi!")

        self.next(self.end)


    @step

    def end(self):

        print("HelloFlow is all done.")



if __name__ == "__main__":

    HelloFlow()

 

python file.py show

python file.py --environment conda show

python file.py run

 Parallel tasks

The start step runs the tasks in parallel by passing each function (step) to self.next.

Lastly a join step has an arg inputs

import time

from metaflow import step, FlowSpec



class BranchFlow(FlowSpec):

    @step

    def start(self):

        print("Starting 👋")

        self.next(self.eat, self.drink)


    @step

    def eat(self):

        print("Pausing to eat... 🍜")

        time.sleep(10)

        self.next(self.join)


    @step

    def drink(self):

        print("Pausing to drink... 🥤")

        time.sleep(10)

        self.next(self.join)


    @step

    def join(self, inputs):

        print("Joining 🖇️")

        self.next(self.end)


    @step

    def end(self):

        print("Done! 🏁")



if __name__ == "__main__":

    BranchFlow()

Flow metadata

from metaflow import Flow, FlowSpec, step, current


class CurrentFlow(FlowSpec):


    @step

    def start(self):

        from datetime import datetime

        self.run_time = datetime.now()

        print("username: %s" % current.username)

        print("namespace: %s" % current.namespace)

        print("flow name: %s" % current.flow_name)

        print("run id: %s" % current.run_id)

        print("origin run id: %s" % current.origin_run_id)

        print("step name: %s" % current.step_name)

        print("task id: %s" % current.task_id)

        print("pathspec: %s" % current.pathspec)

        print("flow parameters: %s" % str(current.parameter_names))

        run = Flow(current.flow_name)[current.run_id]

        run.add_tag(f"run time: {self.run_time:%Y-%m-%dT%H:%M}")

        self.next(self.end)


    @step

    def end(self):

        print("end has a different step name: %s" % current.step_name)

        print("end has a different task id: %s" % current.task_id)

        print("end has a different pathspec: %s" % current.pathspec)


if __name__ == '__main__':

    CurrentFlow()

 Parameters

from metaflow import FlowSpec, Parameter, step


class ParameterFlow(FlowSpec):

    alpha = Parameter(name='alpha',

                      help='Learning rate',

                      default=0.01)


    @step

    def start(self):

        print('alpha is %f' % self.alpha)

        self.next(self.end)


    @step

    def end(self):

        print('alpha is still %f' % self.alpha)


if __name__ == '__main__':

    ParameterFlow()


python parameter_flow.py run --alpha 0.6

CLI

python mf.py --environment conda show