Dagster
Install and run dagster
Create a dagster home folder
Open command line
Switch to the home holder
Create a python virtual environment
python -m venv env
This creates a 'env' folder under the home folder
Then switch to the virtual environment
/home/dagster_home/env/scripts/activate
Install dagster and dagit
pip install dagster dagit
Now every time it needs to enter the virtual environment to work.
e.g. E:\dagster>env\Scripts\activate
Create a new project
dagster project scaffold --name my-dagster-project
The command dagster project scaffold generates a folder structure with a single Dagster repository and other boilerplate files such as workspace.yaml. This helps you to quickly start with an empty project with everything set up.
Switch to the project folder
And run the User Interface through the dagit command. It requires a project (with workspace.yaml) to start the web ui.
dagit
This starts a web interface service at 127.0.0.1:3000
Set environment variable pointing to the home folder of the project! Note, the project folder, e.g. my-dagster-project
DAGSTER_HOME
Enable dagster scheduler by running below.
dagster-daemon run
If the environment variable is not set, this will fail. Start a daemon process in the same folder as your workspace.yaml file, but in a different shell or terminal.
Programming concepts in Dagster
Here simply covers a quick way to create a scheduled job to run python scripts (operations)
Op
An "Op" is the core unit of computation in Dagster.
Multiple Ops can be connected through specifying input/output of the ops.
Use decorator to write an Op:
@op
def load_data():
return ''
@op
def process_data(df):
pass
Job
A job is for executing Ops.
The core of a job is a graph of Ops connected via data dependencies.
Ops are linked together by defining the dependencies between their inputs and outputs.
For example, here it runs load_data op first. The output is fed into process_data so there is a dependency between load_data and process_data ops.
@job
def do_something():
df = load_data()
process_data(df)
Graph
Note a job can contain Ops directly without defining a graph of Ops.
However, it may be good to define a graph of Ops first, and then instantiate the graph as different jobs depending on context/parameters.
#graph
Def my_graph() :
df = load_data()
process_data(df)
Note this is the same as the job definition above.
To run the graph as a job
My_config = {"param": "hello"}
@job(config=My_config)
Def run_a_graph()
my_graph()
The good thing is that you can set a config / context for the job so it can run the graph accordingly.
Schedule
To run a job by schedule, it needs specify the schedule within the script as well.
It uses cron to specify the time. This can be updated later through the UI too.
job_schedule = ScheduleDefinition(job=do_something, cron_schedule="0 0 * * *")
Dagster use 5 digit cron expression for specifying schedules as in below.
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │ 7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * <command to execute>
A * means for every minute/hour/day/ of the field.
A number means the schedule is for that specific minute/hour/day/...
Sensor
A job can be triggered by not only schedule but also external condition.
A sensor checks if the external condition is met, e.g. Temperature is higher than a threshold, then runs the job.
@sensor(job=do_something)
def job_sensor():
external_condition = True #do the check here
if external_condition:
yield RunRequest(run_key=None, run_config={})
Repository
This is optional. It simply lists the job, asset, schedule, etc. So it can be viewed as a repository
@repository
def my_repository():
return [
job_1,
job_schedule,
job_sensor,
]
There is also an import concept of ASSET which is for persisted storage of data. Once materialized the persisted data is stored locally? Need to check doco.
Run jobs
In Dagster, the Ops, Jobs, schedule, etc. Need to be written in a python script, and then use the Dagit command to run it. There are 3 ways to run one-off executions:
Dagit, Dagster CLI, or Python APIs.
To schedule the job to run, make sure the Dagster-daemon is running.
Option 1.
in the python script, set the schedule status to running by default.
my_running_schedule = ScheduleDefinition(
job=my_job, cron_schedule="0 9 * * *", default_status=DefaultScheduleStatus.RUNNING
)
when run dagit, the schedule will be automatically run.
once run, event dagit command is closed, it will still be running through the daemon.
Option 2.
from the dagit web UI, manually enable the scheduler.
Option 3.
Use dagster command to start/stop a schedule
dagster schedule start schedule_name
dagster schedule stop schedule_name
To keep the dagster-daemon live on the server, can just register it as a windows service using NSSM.
NSSM service for dagster on windows
download the nssm zip file and unzip
go to the directory of the nssm.exe, from command line run 'nssm.exe install'
It pops up a window for creating a new service
firstly register the dagit as a service. In the NSSM window put in:
Application Path: D:\dagster\env\Scripts\python.exe (where the python.exe is, either in a virtual env or other python installation)
Startup directory: D:\dagster\my-dagster-project (where the workspace.yaml file is)
Arguments: -m dagit (running the dagit python module)
Service name: whatever name...
In the Details page you may also put in descriptino, in the Log On page, you may choose local system account or a specific account to run the service
secondly register the daemon as a service, put in:
Application Path: D:\dagster\env\Scripts\dagster-daemon.exe (note the daemon exe file, not runing python.exe)
Startup directory: D:\dagster\my-dagster-project (where the workspace.yaml file is)
Arguments: run (i.e. dagster-daemon run)
Service name: whatever name...
To delete a service, from command line run 'nssm.exe remove' and then put in the service name. It may need to stop the service first and referesh services list to see it disappear.