Dagster

Install and run dagster


Create a dagster home folder

Open command line

Switch to the home holder

Create a python virtual environment

   python -m venv env

This creates a 'env' folder under the home folder

Then switch to the virtual environment

  /home/dagster_home/env/scripts/activate

Install dagster and dagit

  pip install dagster dagit

 

Now every time it needs to enter the virtual environment to work.

e.g. E:\dagster>env\Scripts\activate

 

Create a new project

  dagster project scaffold --name my-dagster-project

The command dagster project scaffold generates a folder structure with a single Dagster repository and other boilerplate files such as workspace.yaml. This helps you to quickly start with an empty project with everything set up.


Switch to the project folder

And run the User Interface through the dagit command. It requires a project (with workspace.yaml) to start the web ui.

  dagit

This starts a web interface service at 127.0.0.1:3000

 

Set environment variable pointing to the home folder of the project! Note, the project folder, e.g. my-dagster-project

   DAGSTER_HOME

 

Enable dagster scheduler by running below.

  dagster-daemon run

If the environment variable is not set, this will fail. Start a daemon process in the same folder as your workspace.yaml file, but in a different shell or terminal.

 

 

 

Programming concepts in Dagster

Here simply covers a quick way to create a scheduled job to run python scripts (operations)

 

Op

An "Op" is the core unit of computation in Dagster.

Multiple Ops can be connected through specifying input/output of the ops.

Use decorator to write an Op:

@op

def load_data():

return ''

 

@op

def process_data(df):

pass

 

Job

A job is for executing Ops.

The core of a job is a graph of Ops connected via data dependencies.

Ops are linked together by defining the dependencies between their inputs and outputs.

For example, here it runs load_data op first. The output is fed into process_data so there is a dependency between load_data and process_data ops.

 

@job

def do_something():

df = load_data()

process_data(df)

 

 

Graph

Note a job can contain Ops directly without defining a graph of Ops.

However, it may be good to define a graph of Ops first, and then instantiate the graph as different jobs depending on context/parameters.

#graph

Def my_graph() :

df = load_data()

process_data(df)

Note this is the same as the job definition above.

To run the graph as a job

My_config = {"param": "hello"}

@job(config=My_config)

Def run_a_graph()

   my_graph()

The good thing is that you can set a config / context for the job so it can run the graph accordingly.

 

Schedule

To run a job by schedule, it needs specify the schedule within the script as well.

It uses cron to specify the time. This can be updated later through the UI too.

job_schedule = ScheduleDefinition(job=do_something, cron_schedule="0 0 * * *")

Dagster use 5 digit cron expression for specifying schedules as in below.

# ┌───────────── minute (0 - 59)

# │ ┌───────────── hour (0 - 23)

# │ │ ┌───────────── day of the month (1 - 31)

# │ │ │ ┌───────────── month (1 - 12)

# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;

# │ │ │ │ │                                   7 is also Sunday on some systems)

# │ │ │ │ │

# │ │ │ │ │

# * * * * * <command to execute>

A * means for every minute/hour/day/ of the field.

A number means the schedule is for that specific minute/hour/day/...

 

Sensor

A job can be triggered by not only schedule but also external condition.

A sensor checks if the external condition is met, e.g. Temperature is higher than a threshold, then runs the job.

@sensor(job=do_something)

def job_sensor():

external_condition = True #do the check here

if external_condition:

     yield RunRequest(run_key=None, run_config={})

 

Repository

This is optional. It simply lists the job, asset, schedule, etc. So it can be viewed as a repository

@repository

def my_repository():

return [

     job_1,

     job_schedule,

     job_sensor,

]

 

There is also an import concept of ASSET which is for persisted storage of data. Once materialized the persisted data is stored locally? Need to check doco.

 

 

Run jobs

In Dagster, the Ops, Jobs, schedule, etc. Need to be written in a python script, and then use the Dagit command to run it. There are 3 ways to run one-off executions:

Dagit, Dagster CLI, or Python APIs.

 

 

To schedule the job to run, make sure the Dagster-daemon is running.

Option 1.

   in the python script, set the schedule status to running by default.

my_running_schedule = ScheduleDefinition(

job=my_job, cron_schedule="0 9 * * *", default_status=DefaultScheduleStatus.RUNNING

)

   when run dagit, the schedule will be automatically run.

   once run, event dagit command is closed, it will still be running through the daemon.

 

Option 2.

   from the dagit web UI, manually enable the scheduler.

 

Option 3.

Use dagster command to start/stop a schedule

dagster schedule start  schedule_name

dagster schedule stop  schedule_name


To keep the dagster-daemon live on the server, can just register it as a windows service using NSSM.

NSSM service for dagster on windows

download the nssm zip file and unzip

go to the directory of the nssm.exe, from command line run 'nssm.exe install'

It pops up a window for creating a new service

firstly register the dagit as a service. In the NSSM window put in:

    Application Path: D:\dagster\env\Scripts\python.exe     (where the python.exe is, either in a virtual env or other python installation)

    Startup directory: D:\dagster\my-dagster-project            (where the workspace.yaml file is)

    Arguments:    -m dagit                                                                           (running the dagit python module)

    Service name: whatever name...

In the Details page you may also put in descriptino, in the Log On page, you may choose local system account or a specific account to run the service

secondly register the daemon as a service, put in:

     Application Path: D:\dagster\env\Scripts\dagster-daemon.exe     (note the daemon exe file, not runing python.exe)

    Startup directory: D:\dagster\my-dagster-project                                  (where the workspace.yaml file is)

    Arguments:    run                                                                           (i.e. dagster-daemon run)

    Service name: whatever name...

To delete a service, from command line run 'nssm.exe remove' and then put in the service name. It may need to stop the service first and referesh services list to see it disappear.