In this article we will set up Azure Devops CICD to deploy a databricks asset bundle for a DLT pipeline. We will also use commit message 'commands' to control which job to run.
The required files and structure are:
├── README.md
├── .azdevops
├── databricks
│ ├── the_dlt_pipeline
│ │ ├── databricks.yml
│ │ ├── resources
│ │ │ └── the_dlt_pipeline.yml
│ │ └── src
│ │ └── the_dlt_pipeline.ipynb
│ └── common
└── utils.py
We define the CICD pipeline in a yml file inside the .azdevops folder. When setting up the build pipeline for the first time we will use an existing build file and point to this file. Usually we have one for each environment
In order to use commit message 'commands' we always have a first step which is 'evaluate commit message'. This uses a bash task with an inline script that checks the commit message for certain words. If these words exist it will set a variable and these variables can be used in the job condition attribute.
stages:
- stage: eval_commit_message
jobs:
- job: read_commit_message
steps:
- checkout: self
persistCredentials: true
fetchTags: true
fetchDepth: 0
- task: Bash@3
name: eval_commit_message
inputs:
targetType: "inline"
workingDirectory: $(Build.SourcesDirectory)
script: |
message=$(echo "$(Build.SourceVersionMessage)")
if [ $(Build.Reason) == "PullRequest" ]; then
url="$(System.CollectionUri)_apis/git/repositories/$(Build.Repository.ID)/pullRequests/$(System.PullRequest.PullRequestId)?api-version=7.0"
auth="Bearer $(System.AccessToken)"
pullRequestInfo=$(curl -s -u azdo:$(System.AccessToken) "${url}")
message=$(echo $pullRequestInfo | jq -r .title)
fi
[[ $message =~ "deploy_dlt" ]] && echo "##vso[task.setvariable variable=deploy_dlt;isOutput=true]true" || echo "##vso[task.setvariable variable=deploy_dlt;isOutput=true]false"
In the above we extract either the latest commit message or the pull request title into the message variable. Then we search for deploy_dlt and set the deploy_dlt variable to true or false according to whether it exists or not.
In the next section we set up the variables and set them to the output of the eval_commit_message stage:
- stage: deploy_and_run_jobs
variables:
- group: azure_subscription_dev
- name: deploy_dlt
value: $[stageDependencies.eval_commit_message.read_commit_message.outputs['eval_commit_message.deploy_dlt']]
In the next section we check that the variable is set and if so call the DAB deployment task, passing in the location of the DAB and the secrets required:
jobs:
- job: deploy_accesso_webhook_dlt
condition: or(
eq(variables['deploy_dlt'], 'true')
)
steps:
- checkout: self
- template: devops_templates/ado_pipeline_templates/databricks_asset_deploy.yml@templates
parameters:
deploy_to_branch: ${{ variables['Build.SourceBranch'] }}
bundle_directory: "./databricks/the_pipeline_dlt"
bundle_target: "dev"
environment_variables:
DATABRICKS_HOST: https://adb-<DBR_HOST>.azuredatabricks.net
ARM_CLIENT_ID: $(spn_application_id)
ARM_CLIENT_SECRET: $(spn_secret)
ARM_TENANT_ID: $(spn_tenant_id)
You will note that the CICD job above uses a template to do the actual deployment. We can use reusable templates to deploy (and/or run) a databricks asset bundle. We did this within one of our clients and the approach was similar to this excellent article (Effortless Databricks Asset Bundle Deployments with Azure DevOps) and git (repo)
The template consists of a number of tasks which download the databricks cli, gets a databricks token to authenticate, and then runs the databricks bundle validate command followed by the databricks bundle deploy command.
To run the job we would pass in the job name. Here it is the_dlt_pipeline_job which is defined in the asset bundle below
Another example is here Databricks Asset Bundles Azure DevOps Project | by Hrushi Medhe | Medium and an alternative approach without templates : Step-by-Step: Setting Up a CI/CD Pipeline with Asset Bundle for DLT using Private Agents on Azure | by Danilo Deoliveiraperez | Databricks Platform SME | Medium
A databricks asset bundle consists of at least 2 files:
databricks.yml : this is the main bundle configuration file. In the example here this also imports any resource files under the resources folder (the_dlt_pipeline.yml). The way I set it up is that databricks.yml contains the generic bundle config, the variables and the targets (eg for each environment).
src/the_dlt_pipeline.ipynb : this contains the logic that implements the DLT pipeline.
Here is my databricks.yml :
bundle:
name: the_dlt_pipeline_bundle
sync:
paths:
- ../common
- .
include:
- resources/*.yml
variables:
environment:
description: The environment to run in
default: dev
bronze_catalog:
description: The catalog to use
bronze_schema:
description: The schema to use
run_service_principal:
description: The service principal for this data product
default: not_set
targets:
dev:
mode: production
workspace:
host: https://adb-<DBR_HOST>.azuredatabricks.net
root_path: /data-products/.bundle/${bundle.name}/${bundle.target}
run_as:
service_principal_name: ${var.run_service_principal}
variables:
bronze_catalog: dev_bronze
bronze_schema: dlt_pipeline
environment: dev
run_service_principal: <THE SP>
permissions:
- service_principal_name: ${var.run_service_principal}
level: CAN_MANAGE
- group_name: data_engineers
level: CAN_MANAGE
The the_dlt_pipeline.yml resource then defines the actual pipeline:
resources:
pipelines:
the_dlt_pipeline_job:
name: the_dlt_pipeline_job_name
catalog: ${var.bronze_catalog}
target: ${var.bronze_schema}
libraries:
- notebook:
path: ../src/the_dlt_pipeline.ipynb
clusters:
- label: default
node_type_id: "Standard_D32ads_v5"
autoscale:
min_workers: 1
max_workers: 5
custom_tags:
environment: ${bundle.target}
owner: <the owner>
spark_conf:
environment: ${bundle.target}
bronze_catalog: ${var.bronze_catalog}
bronze_schema: ${var.bronze_schema}
configuration:
bundle.sourcePath: ${workspace.file_path}
pipelines.clusterShutdown.delay: "60s"
spark.databricks.acl.needAdminPermissionToViewLogs: false
notifications:
- alerts:
- on-update-failure
- on-flow-failure
email_recipients:
- ${var.support_email}
continuous: false # the pipeline will be triggered by the scheduled job.
edition: advanced
channel: preview
permissions:
- group_name: <team_name>
level: CAN_MANAGE
The src/the_dlt_pipeline.ipynb is a special notebook that implements the DLT pipeline. This will typically use autoloader to ingest data from an external source and write it to a table in Unity. I prefer to use the dlt annotations.
%python
import dlt
from dlt import on_event_hook
from pyspark.sql.functions import *
import sys
sys.path.append(spark.conf.get("bundle.sourcePath") + "/common")
print(f"{sys.path=}")
from utils import get_schema, get_raw_table_schema_hints, extract_table
# Get pipeline config variables
environment = spark.conf.get("environment")
bronze_catalog = spark.conf.get("bronze_catalog", "dev_labs")
bronze_schema = spark.conf.get("bronze_schema", "lab_alistair_mclean")
print(f"{bronze_catalog=}, {bronze_schema=}")
@dlt.table(
name=f"raw_data",
comment="Raw data ingested from source",
cluster_by=["ingestion_year", "ingestion_month", "ingestion_day"],
table_properties={'delta.feature.variantType-preview': 'supported'}
)
def get_data():
base_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.option("readerCaseSensitive", "false")
.option("badRecordsPath", bad_records_path)
# ignore any updates due to deletions (eg deleting records in the source due to gdpr)
.option("skipChangeCommits", "true")
.option("cloudFiles.inferColumnTypes", "true")
# changes in the rescue column and then updating the schemaHints if required.
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("rescuedDataColumn", "_rescued_data") # used with rescue mode
)
In this article I have show how to set up an end-end CICD pipeline that can deploy and run a DLT pipeline using commit message 'commands'.