REST API
The Databricks REST API provide a way to manage the DBFS and other databricks related tasks.
Firstly it needs to generate a personal access token for authentication. This can be done from User Setting-> Generate New Token
Then find out the databricks instance from Compute -> cludter node -> Advanced options -> JDBC/ODBC
Or find out the connection details from SQL end point if any.
The below python script connect to an instance calling the API '/clusters/get' to get a cluster's information
import requests
instance_id = 'abc-1234567890.10.azuredatabricks.net'
api_version = '/api/2.0'
api_command = '/clusters/get'
url = f"https://{instance_id}{api_version}{api_command}"
TOKEN = 'the generated personal access token'
params = {
'cluster_id': '1234-12345678-mhxgk123'
}
response = requests.get(
url = url,
params = params,
headers={'Authorization': 'Bearer %s' % TOKEN },
)
print(response.text)
Query API to list all jobs
The request can return up to 100 jobs in the list. If the cluster has more than 100 jobs, use the offset parameter to query the next 100 jobs.
import requests
instance_id = 'dbc-1234567890.cloud.databricks.com'
api_version = '/api/2.1'
api_command = '/jobs/list'
url = f"https://{instance_id}{api_version}{api_command}"
TOKEN = 'dapixxxxxx'
limit = 100
offset = 0
count = 100
total = 0
while count == limit:
response = requests.get(
url = url,
params= f'limit={limit}&offset={offset}',
headers={'Authorization': 'Bearer %s' % TOKEN },
)
jobs = response.json()['jobs']
count = len(jobs)
total += count
for job in jobs:
job_id = job['job_id']
settings = job['settings']
name = settings['name']
if 'continuous' in settings:
schedule = 'continuous'
elif 'schedule' in settings:
schedule = settings['schedule']
else:
schedule = None
print(f'{name} {schedule}')
offset += 100
Update job schedule, and other settings
The job name, description, schedule time, etc can be updated through the api
import requests
import json
instance_id = 'dbc12345667890.databricks.com'
api_version = '/api/2.1'
api_command = '/jobs/update'
url = f"https://{instance_id}{api_version}{api_command}"
TOKEN = 'dapixxxxxx'
job_id = '1234567' #the job id from the query above, or copied from the web portal
post_data = {"job_id": job_id,
"new_settings":{
"schedule": {
"quartz_cron_expression": "0 0 0 ? * MON-FRI *", #mondy to friday, 12am
"timezone_id": "Australia/Brisbane",
"pause_status": "UNPAUSED"
}
}
}
response = requests.post(
url = url,
data = json.dumps(post_data),
headers={'Authorization': 'Bearer %s' % TOKEN },
)
print(response)
Run jobs
run an existing job through api.
import requests
import json
instance_id = 'yourcluster.cloud.databricks.com'
TOKEN = 'your token'
api_version = '/api/2.1'
api_command = '/jobs/run-now'
url = f"https://{instance_id}{api_version}{api_command}"
job_id = 12345 #the job id, can be queried by job name through job list api
parameter_value = 'job parameter'
post_data = {"job_id": job_id,
"queue": {
"enabled": True
},
"job_parameters":{
"table_name": parameter_value
}
}
print(post_data)
response = requests.post(
url = url,
data = json.dumps(post_data),
headers={'Authorization': 'Bearer %s' % TOKEN },
)
print('--------- {}'.format(response))
Upload file to databricks
This opens a local file in raw bytes format, and attachs it as the body of a PUT command.
The URL has the path of the target file. In Unity Catalog, one can create a new volume under a data catalog, and the path follows : /Volumes/catalog name/schema name/volume name/folder/file
To read the uploaded file from databricks, use the same volume path in pd.read_excel() or open(...)
import requests
instance_id = 'dbc-123456.cloud.databricks.com'
TOKEN = 'dapixxxxx'
api_version = '/api/2.0'
api_command = '/fs/files'
volume_path = '/Volumes/dev_nhg_erp/default/test/test.xlsx'
url = f"https://{instance_id}{api_version}{api_command}{volume_path}"
with open('C:/temp/test data.xlsx', 'rb') as f:
response = requests.put(
url = url,
data = f.read(),
headers={'Authorization': 'Bearer %s' % TOKEN },
)