Metaflow
https://outerbounds.com/sandbox/
https://docs.metaflow.org/getting-started/tutorials
https://github.com/Netflix/metaflow/blob/master/metaflow/tutorials/00-helloworld/helloworld.py
Hello world
file.py:
from metaflow import FlowSpec, step, conda_base, current
@project(name="hello_work")
@conda_base(libraries={"pandas": "2.0.3"}, python="3.10.11")
class HelloFlow(FlowSpec):
@step
def start(self):
print("HelloFlow is starting with following MetaFlow parameters:")
print(f"username: {current.username}")
print("namespace: {current.namespace}")
print("flow name: {current.flow_name}")
print("run id: {current.run_id}")
print("origin run id: {current.origin_run_id}")
print("step name: {current.step_name}")
print("task id: {current.task_id}")
print("pathspec: {current.pathspec}")
print("flow parameters: {current.parameter_names}")
print(f"pathspec: {current.pathspec}")
self.next(self.hello)
@step
def hello(self):
print("Metaflow says: Hi!")
self.next(self.end)
@step
def end(self):
print("HelloFlow is all done.")
if __name__ == "__main__":
HelloFlow()
python file.py show
python file.py --environment conda show
python file.py run
Parallel tasks
The start step runs the tasks in parallel by passing each function (step) to self.next.
Lastly a join step has an arg inputs
import time
from metaflow import step, FlowSpec
class BranchFlow(FlowSpec):
@step
def start(self):
print("Starting 👋")
self.next(self.eat, self.drink)
@step
def eat(self):
print("Pausing to eat... 🍜")
time.sleep(10)
self.next(self.join)
@step
def drink(self):
print("Pausing to drink... 🥤")
time.sleep(10)
self.next(self.join)
@step
def join(self, inputs):
print("Joining 🖇️")
self.next(self.end)
@step
def end(self):
print("Done! 🏁")
if __name__ == "__main__":
BranchFlow()
Flow metadata
from metaflow import Flow, FlowSpec, step, current
class CurrentFlow(FlowSpec):
@step
def start(self):
from datetime import datetime
self.run_time = datetime.now()
print("username: %s" % current.username)
print("namespace: %s" % current.namespace)
print("flow name: %s" % current.flow_name)
print("run id: %s" % current.run_id)
print("origin run id: %s" % current.origin_run_id)
print("step name: %s" % current.step_name)
print("task id: %s" % current.task_id)
print("pathspec: %s" % current.pathspec)
print("flow parameters: %s" % str(current.parameter_names))
run = Flow(current.flow_name)[current.run_id]
run.add_tag(f"run time: {self.run_time:%Y-%m-%dT%H:%M}")
self.next(self.end)
@step
def end(self):
print("end has a different step name: %s" % current.step_name)
print("end has a different task id: %s" % current.task_id)
print("end has a different pathspec: %s" % current.pathspec)
if __name__ == '__main__':
CurrentFlow()
Parameters
from metaflow import FlowSpec, Parameter, step
class ParameterFlow(FlowSpec):
alpha = Parameter(name='alpha',
help='Learning rate',
default=0.01)
@step
def start(self):
print('alpha is %f' % self.alpha)
self.next(self.end)
@step
def end(self):
print('alpha is still %f' % self.alpha)
if __name__ == '__main__':
ParameterFlow()
python parameter_flow.py run --alpha 0.6
CLI
python mf.py --environment conda show