Updated November 19, 2021
Raw data comes to us as production datastore exports and product telemetry data (over 15 trillion records so far and billions generated daily). When Discord was a smaller company and data use cases were simpler, it was somewhat tenable, if not ideal, to manually compute useful datasets as needed. Today, we process petabytes of data with 30,000 vCPUs in the cloud. To be useful, the raw data must be cleaned, privatized according to our data governance policies, and then transformed into a complex schema of thousands of precomputed tables in our 30+ petabyte data warehouse (we use Google BigQuery).
As of this writing, the part of the Data Platform team responsible for ingesting raw data and making it accessible consists of eight people and we were even fewer in number during the history described below. Given the team’s size relative to the rest of Discord, it was important to build a system that was self-serve and as automated as possible. This is the story of how we turned petabytes of raw data into a structured data warehouse and the system we built to maintain it, internally referred to as Derived.
Requirements and Approach
What we needed was a system for maintaining a complex Directed Acyclic Graph (DAG) of precomputed data—in our case, this meant a DAG of derived tables in our BigQuery data warehouse:
- A derived table essentially represents a data transformation that may have predecessor tables in the DAG as input dependencies: in other words, a derived table definition may be thought of as a SQL SELECT statement that references raw data or other derived tables.
- Assuming the DAG flows top to bottom, one might imagine that at the top of the DAG would be the raw data sources and lookup tables; in the middle, a core set of reusable “golden” core data tables (e.g. normalized daily sign-ups across platforms); and towards the bottom of the DAG, tables that are intended to be consumed directly in analysis, BI tools or machine learning models.
- The DAG may contain thousands of tables, so it needs to scale.
Though the system would be broken down into a series of deliverable milestones, we wanted the eventual system to meet the following requirements:
- Table updates should run as soon as new data is available (but no sooner!)
- Maintain an audit trail of mutations to each table.
- Include primitives for powering data lineage and a data catalog.
- Intuitive, self-serve table modifications for stakeholder teams like engineering, data science, and machine learning.
- Ability to automatically export derived data to production datastores for use in Discord’s user-facing product.
- Simple and easy to operate in the context of Discord’s infrastructure environment.
While existing solutions such as dbt, Airflow, and Looker solve for some of the above, we ultimately decided that we wanted a more custom solution that would integrate nicely with our existing systems and give us the flexibility to extend to use cases beyond analytics.
We were already using Airflow to schedule batch jobs and to process simpler datasets, but we found the following limitations:
- Writing jobs was complicated and required people to have a deep understanding of Python, SQL, and Airflow. This violated our requirement that DAG modifications should be self-serve.
- Scheduling queries that depended on each other on different schedules was difficult to reason about (e.g., knowing when to update a table that is scheduled weekly and reads from a table that is updated monthly).
- Knowing exactly where to insert your table build into the dependency graph and understanding its impact on other tables was not straightforward (e.g., knowing when tables you own should be backfilled because predecessor datasets owned by another team were incomplete, introduced corrupt data, or had data changes).
- Writing logic for incremental data builds that append or merge data into an existing table was mistake-prone and copypasta, especially when accounting for all the potential rebuild and backfill conditions.
Taking into account our requirements, observing existing pain points, and drawing some inspiration from existing solutions, we made the following design choices:
- People should only need to know SQL to define derived tables.
- People should not need to know about the specific structure of the DAG: the system will infer the DAG from within the SQL.
- Everything should be in git for a complete history of changes and for easy lookup of current production configurations.
- The system had to integrate data processing with our existing data privatization systems and data governance policies. We take the privacy of our users seriously, and it was paramount that whatever we built enforced our strict privacy controls.
- Metadata history and current state of each table should be stored in an accessible format to build monitoring, lineage, and performance tooling.
- Data repair operations (backfills) should be easy and ensure that data is consistent across the entire data warehouse.
Version One: The Minimum Viable Product
For the initial deliverable, the highest priority goals were to get data transformations into git, ensure that data was consistent across the warehouse, and simplify data operations. We built the following:
- Derived tables would be defined by SQL using files in Jinja templating format. Each table would be configured in its own file and stored in git.
- The framework would build the DAG of dependencies based on the table configurations and build out the data warehouse, leveraging Airflow for scheduling of jobs, visualization, and monitoring.
- We would build a basic command-line tool to accurately rebuild and backfill tables.
- In order to manage the scope of the MVP, we decided to group tables by update schedule (eg, hourly, daily, weekly, or monthly) to avoid complex dependency resolution logic. The tradeoff was that we couldn’t easily intermix tables with different update schedules.
Table build behavior specified using one of three different strategies would instruct how tables are built, incremented, and backfilled:
- Replace: replace the entire table on a regular schedule.
- Append: add data incrementally to a table on a regular schedule.
- Merge: merge incoming data with existing data based on configured criteria. This strategy is primarily used with tables supporting cohort analysis where we want to segment on user attributes such as “the first time a user used voice chat ” or “the most recent time that a user joined a Discord community server.”
Thus, Derived was born and fit into our architecture as illustrated below:
Version Two: Ergonomics
The MVP proved the technology of constructing the DAG, building tables, and managing the data warehouse, but people internally struggled to create new Derived Tables without the help of Data Engineers because the process was still too complicated and obscure. So for the next iteration, we focused on creating a simple user interface for people to easily create new tables and write documentation right alongside their code.
- Introduced YAML format so that people could focus on writing SQL and only need to learn a few properties for how frequently the table should run and the window (time range) of data it should run on.
- Enabled dependencies between tables with any combination of schedule, window, and strategy so that the people would not need to know about the specific structure of the DAG and its dependencies.
- One limitation that we accepted for this iteration was that table metadata was still stored in Airflow, and it would require us to pause the airflow dag to do repair operations. Additionally, syncing table state with airflow metadata after rebuilds was complex.
When the table definition is merged into the main branch, Derived will create the table and initially populate it with all existing data. For every subsequent run, it will upsert one hour of data into the existing table - a MERGE operation in BigQuery terms. Also, as you can see below, the documentation for table and column definitions lives with the table definition, minimizing the possibility of drift between table functionality and documentation.
Another benefit of adopting this standardized interface is that it provides an abstraction layer for us to rapidly iterate on the systems underlying the configuration without impacting teams.
Version Three: Automation
Version Two successfully unlocked our Data Science teams to create tables without assistance, and they created hundreds of tables within the first year. With this success emerged a new set of problems:
- Creating new tables was easy but updating tables required manual steps by Data Engineers: DAG builds/rebuilds were not automated and needed to be triggered by engineers. While these maintenance tasks were relatively simple, they took time and occurred more frequently as adoption increased.
- A single table with a SQL bug would block all tables from progressing and bugs became more frequent as adoption increased: Test suites did not comprehensively test all combinations of generated SQL nor the dependencies between tables. The worst part was that bugs frequently would not manifest until running in production due to incorrect assumptions about the data, and it would frequently require Data Engineers to backfill data and repair tables.
- It was difficult to use Derived insights to power application features because the data warehouse (BigQuery) is not optimized for the milliseconds latency requirements of user-facing services.
Version Three therefore focused on improving the reliability of deployments and automating the rebuilding/repairing of Derived tables. To accomplish this, we focused on ergonomics, testing, and general automation:
We wanted people to be able to test while developing new tables, so we implemented the following:
- For local development, people are able to use a command-line interface (CLI) to load up the real table configurations and validate dependencies across the entire DAG.
- From the CLI, people can also create test versions of their tables on shadow production data to verify table output.
- Once a pull request is created, continuous integration (CI) deploys all new tables to a shadow production environment so that people are able to validate their changes again with real data before merging the pull request.
In Version Two of Derived, the table’s metadata was tracked in Airflow, resulting in a number of manual steps during data maintenance operations (e.g. a backfill required pausing the DAG, running the operation, and then syncing the actual state of the table with Airflow metadata).
To automate data operations we moved table state tracking out of Airflow and into a metadata log so that Derived could independently decide when to repair, rebuild, and add data to tables.
More detailed state tracking at the table level also unlocks parallel computations so that a parent process doesn’t block while sequencing and scheduling 900+ tables, all tables can run concurrently and as frequently as desired to keep derived insights consistent across the data warehouse and up-to-date with data sources. Each table updater is deployed as its own Kubernetes Pod: when a pod starts up, it runs through the following steps:
The metadata log is available in BigQuery and enables detailed monitoring, performance analysis, and data lineage. It answers monitoring questions like When was the table last updated? How recent is the data in the table? For performance analysis, we join the metadata log to the BigQuery information_schema for query execution details; and to report on metrics for each table. Data lineage can be obtained from the metadata log by tracking predecessor dependencies when tables are updated, so the entire lineage can be re-constructed by traversing the metadata log.
Powering Discord Features:
Up until now, Derived operated only on BigQuery datasets (a data warehouse designed for big data processing) that frequently has query response times greater than one second. In order to power application features the response times needed to be much faster, especially for machine learning features where the application flow is: receive a user request, query multiple Derived datasets to create a feature set, make a prediction and respond to the user within one second. For this we added a new configuration option on Derived to automatically export from BigQuery to Scylla so that the Derived dataset would be available in a database designed for high-performance queries in online systems.
We’ve been running Version Three in production for over a year now and have accomplished the original seven goals we set out to achieve...
✔️ Table updates should run as soon as new data is available (but no sooner!)
✔️ Maintains an audit trail of mutations to derived datasets.
✔️ Includes primitives for powering data lineage and data catalog tooling.
✔️ Modifications to the DAG should be self-serve and intuitive for stakeholder teams like engineering, data science, and machine learning.
✔️ Aware of data access controls and provides scalable data governance policy enforcement.
✔️ Able to automatically export derived data to production datastores for use in Discord’s user-facing product.
✔️ Simple and easy to operate in the context of Discord’s environment.
... but the journey is far from over, there are now thousands of tables in production and the team often receives feedback and suggestions from engaged people internally who are using Derived to build out very complex data sets. The system processes petabytes of data daily from trillions of data points and we continue to improve the performance and feature set of Derived. We are working on a Version Four now -- we’re very creative with our project names around here -- and look forward to sharing further insights on the iterations to come.
Whew! That was a lot of information and quite the adventure for the team! If working with massive data sets strikes a chord with you, we invite you to check out our jobs page and apply!