In the Data Analytics team within the Observability organization in Elastic, we use dbt (dbt™, data build tool) to execute our SQL data transformation pipelines. dbt is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code. In particular, we use dbt core, the open-source project, where you can develop from the command line and run your dbt project.
Our data transformation pipelines run daily and process the data that feed our internal dashboards, reports, analyses, and Machine Learning (ML) models.
There have been incidents in the past when the pipelines have failed, the source tables contained wrong data or we have introduced a change into our SQL code that has caused data quality issues, and we only realized once we saw it in a weekly report that was showing an anomalous number of records. That’s why we have built a monitoring system that proactively alerts us about these types of incidents as soon as they happen and helps us with visualizations and analyses to understand their root cause, saving us several hours or days of manual investigations.
We have leveraged our own Observability Solution to help solve this challenge, monitoring the entire lifecycle of our dbt implementation. This setup enables us to track the behavior of our models and conduct data quality testing on the final tables. We export dbt process logs from run jobs and tests into Elasticsearch and utilize Kibana to create dashboards, set up alerts, and configure Machine Learning jobs to monitor and assess issues.
The following diagram shows our complete architecture. In a follow-up article, we’ll also cover how we observe our python data processing and ML model processes using OTEL and Elastic - stay tuned.
Why monitor dbt pipelines with Elastic?
With every invocation, dbt generates and saves one or more JSON files called artifacts containing log data on the invocation results.
This file contains information about a completed invocation of dbt, including timing and status info for each node (model, test, etc) that was executed. In aggregate, many
run_results.jsoncan be combined to calculate average model runtime, test failure rates, the number of record changes captured by snapshots, etc.
Monitoring
Monitoring
Processing our dbt logs with Elastic and Kibana allows us to obtain real-time insights, helps us quickly troubleshoot potential issues, and keeps our data transformation processes running smoothly. We set up anomaly detection jobs and alerts in Kibana to monitor the number of rows processed by dbt, the slot time, and the results of the tests. This lets us catch real-time incidents, and by promptly identifying and fixing these issues, Elastic makes our data pipeline more resilient and our models more cost-effective, helping us stay on top of cost spikes or data quality issues.
We can also correlate this information with other events ingested into Elastic, for example using the Elastic Github connector, we can correlate data quality test failures or other anomalies with code changes to find the root cause of the commit or PR that caused the issues. By ingesting application logs into Elastic, we can also analyze if these issues in our pipelines have affected downstream applications, increasing latency, throughput or error rates using APM. Ingesting billing, revenue data or web traffic, we could also see the impact in business metrics.
How to export dbt invocation logs to Elasticsearch
We use the Python Elasticsearch client to send the dbt invocation logs to Elastic after we run our
This python helper function will index the results from your
- RESULTS_FILE: path to yourrun_results.jsonfile
- DBT_RUN_LOGS_INDEX: the name you want to give to dbt run logs index in Elastic, e.g.dbt_run_logs
- DBT_TEST_LOGS_INDEX: the name you want to give to the dbt test logs index in Elastic, e.g.dbt_test_logs
- ES_CLUSTER_CLOUD_ID
- ES_CLUSTER_API_KEY
Then call the function
from elasticsearch import Elasticsearch, helpers
import os
import sys
import json
def log_dbt_es():
RESULTS_FILE = os.environ["RESULTS_FILE"]
DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]
DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]
es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]
es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]
es_client = Elasticsearch(
cloud_id=es_cluster_cloud_id,
api_key=es_cluster_api_key,
request_timeout=120,
)
if not os.path.exists(RESULTS_FILE):
print(f"ERROR: {RESULTS_FILE} No dbt run results found.")
sys.exit(1)
with open(RESULTS_FILE, "r") as json_file:
results = json.load(json_file)
timestamp = results["metadata"]["generated_at"]
metadata = results["metadata"]
elapsed_time = results["elapsed_time"]
args = results["args"]
docs = []
for result in results["results"]:
if result["unique_id"].split(".")[0] == "test":
result["_index"] = DBT_TEST_LOGS_INDEX
else:
result["_index"] = DBT_RUN_LOGS_INDEX
result["@timestamp"] = timestamp
result["metadata"] = metadata
result["elapsed_time"] = elapsed_time
result["args"] = args
docs.append(result)
_ = helpers.bulk(es_client, docs)
return "Done"
# Call the function
log_dbt_es()
If you want to add/remove any other fields from
Once the results are indexed, you can use Kibana to create Data Views for both indexes and start exploring them in Discover.
Go to Discover, click on the data view selector on the top left and “Create a data view”.
Now you can create a data view with your preferred name. Do this for both dbt run (
Going back to Discover, you’ll be able to select the Data Views and explore the data.
dbt run alerts, dashboards and ML jobs
The invocation of
- unique_id: Unique model identifier
- execution_time: Total time spent executing this model run
The logs also contain the following metrics about the job execution from the adapter:
- adapter_response.bytes_processed
- adapter_response.bytes_billed
- adapter_response.slot_ms
- adapter_response.rows_affected
We have used Kibana to set up Anomaly Detection jobs on the above-mentioned metrics. You can configure a multi-metric job split by
We have used the ML job to set up alerts that send us emails/slack messages when anomalies are detected. Alerts can be created directly from the Jobs (Machine Learning > Anomaly Detection Jobs) page, by clicking on the three dots at the end of the ML job row:
We also use Kibana dashboards to visualize the anomaly detection job results and related metrics per table, to identify which tables consume most of our resources, to have visibility on their temporal evolution, and to measure aggregated metrics that can help us understand month over month changes.
dbt test alerts and dashboards
You may already be familiar with tests in dbt, but if you’re not, dbt data tests are assertions you make about your models. Using the command
dbt test logs contain the following fields:
- unique_id: Unique test identifier, tests contain the “test” prefix in their unique identifier
- status: result of the test,passorfail
- execution_time: Total time spent executing this test
- failures: will be 0 if the test passes and 1 if the test fails
- message: If the test fails, reason why it failed
The logs also contain the metrics about the job execution from the adapter.
We have set up alerts on document count (see guide) that will send us an email / slack message when there are any failed tests. The rule for the alerts is set up on the dbt test Data View that we have created before, the query filtering on
We have also built a dashboard to visualize the tests run, tests failed, and their execution time and slot time to have a historical view of the test run:
Finding Root Causes with the AI Assistant
The most effective way for us to analyze these multiple sources of information is using the AI Assistant to help us troubleshoot the incidents. In our case, we got an alert about a test failure, and we used the AI Assistant to give us context on what happened. Then we asked if there were any downstream consequences, and the AI Assistant interpreted the results of the Anomaly Detection job, which indicated a spike in slot time for one of our downstream tables and the increase of the slot time vs. the baseline. Then, we asked for the root cause, and the AI Assistant was able to find and provide us a link to a PR from our Github changelog that matched the start of the incident and was the most probable cause.
Conclusion
As a Data Analytics team, we are responsible for guaranteeing that the tables, charts, models, reports, and dashboards we provide to stakeholders are accurate and contain the right sources of information. As teams grow, the number of models we own becomes larger and more interconnected, and it isn’t easy to guarantee that everything is running smoothly and providing accurate results. Having a monitoring system that proactively alerts us on cost spikes, anomalies in row counts, or data quality test failures is like having a trusted companion that will alert you in advance if something goes wrong and help you get to the root cause of the issue.
dbt invocation logs are a crucial source of information about the status of our data pipelines, and Elastic is the perfect tool to extract the maximum potential out of them. Use this blog post as a starting point for utilizing your dbt logs to help your team achieve greater reliability and peace of mind, allowing them to focus on more strategic tasks rather than worrying about potential data issues.