Throughlines: Tracing Data With Field-level Lineage, Part 2

Throughlines: Tracing Data With Field-level Lineage, Part 2
High-level architecture of field-level lineage at Single Origin.

In Part 1 of Throughlines: Tracing Data With Field-level Lineage, we discussed how lineage was a natural extension of parsing SQL using our common definition model, as well as use cases for lineage. In this Part 2, we will cover more technical details about how we go from SQL to lineage.

Single Origin requires a connection to your data warehouse that we use to collect and store metadata. This metadata helps us understand what tables exist in your warehouse. Then, when you import SQL queries, a SQL Processor extracts lineage information and keeps it in a time-series lineage store. This lineage store helps us understand what and how tables and fields are related. We can generate field-level lineage by combining the metadata and the lineage store.

Metadata Store

Single Origin connects to the data warehouse to collect metadata about the state of your datasets. We only collect necessary metadata to minimize cost. Once a connector is set up, our workflow starts collecting metadata such as the table schemas, descriptions, ownership, and usage information.

The workflow collects and processes metadata from the data warehouse with concurrent workers and stores them in the Metadata Store.

To collect a large amount of metadata, we build a metadata store that can be scalable and extensible. The metadata store powers features such as data discovery and authoring.

Each metadata is considered to be an attribute of the entity that can continually be updated or expanded upon.

ID

Field Name

Field Value

<prefix>.order_items

ordinal_position

1

<prefix>.order_items

data_type

ARRAY

Example of how metadata attributes are stored for the order_items column.

For a more accurate field-level lineage, we leverage column names and data types in our SQL processing. When ingesting column information into Single Origin, we map them into our standardized data types.

Standardized Data Types

Standardizing data types in the metadata store allows us to extend SQL processing to other data warehouses and manage data entities in a multi-cloud environment. The SQL Processor will need to only interface with one metadata store rather than every data warehouse implementation.

Standardized Data Type

BigQuery Data Type

Snowflake Data Type

DATETIME

DATETIME

DATETIME

BOOLEAN

BOOL

BOOLEAN

STRUCT

STRUCT

N/A

Data types are mapped to Single Origin standardized data types.

Data types from different sources may have different names for the type. In some cases, one type may not be supported in another data source. For example, Snowflake has semi-structure data support rather than a data type with struct fields explicitly defined like in BigQuery. Standardization of the data types inside our metadata store allows our downstream data systems to be agnostic toward the source of the metadata.

The metadata store is the foundation for accurate field-level lineage. Before we go into SQL Processing, let’s take a look at how we store the lineage.

Lineage Store

The Lineage Store records entity relationships and how they are derived. Once a SQL query has been processed, the derived lineage is saved in the Lineage Store for operational queries. In each lineage event, we store the context of how the relationship was created.

Lineage Context

Each lineage event has additional information such as the SQL transformation, what operation was used, and the time it occurred. This allows users to quickly understand how field-level data was transformed and used over time.

Source Node

Target Node

Type

Context

Occurred at

order_items.order_id

user_order.count_orders

source

COUNT( DISTINCT `order_items.order_id` )

2022-10-04T15:18:22+00:00

Example of a lineage with SQL transformation as context and the type of lineage, which we will go into more detail in the later section.

As shown in the table, the lineage store is composed of time-series data about every SQL query that went through the SQL processor.

Time-series Lineage

Lineage is an active system that is updated continuously. Data pipelines and table schemas are frequently changed by Data Engineers and Data Scientists; lineage information that was captured a day ago may no longer be valid. To ensure we can capture these changes, we maintain a time-series system in order to reflect the most accurate lineage information.

When a new create or update query is detected, the lineage store is updated. Our API queries the last 30 days of lineage events by default, but we also provide an option to get the latest lineage information beyond the default time window.

Lineage data powers various workflows in Single Origin, from detecting changes to rendering the interactive graph on the UI. We have integrated field-level lineage into the core of Single Origin by detecting changes that can impact downstream consumers.

Before the lineage events can be ingested into the time-series lineage store, each SQL must be processed to derive the field-level lineage.

Turning SQL into Lineage

The above architecture graph demonstrates the three steps required to extract a lineage graph from a given SQL query.

  1. Parsing, validation, and transformation
  2. Decomposing and extracting
  3. Lineage generation

Parse, Validation, and Transformation

Single Origin parses SQL into AST (abstract syntax tree) based on Calcite which validates the AST with the standardized schema in the metadata store. During the validation process, the scope of each syntax node is derived and verified, ensuring a higher level of accuracy than parsing alone. When two or more nodes have the same name, the correct context for each is attached to the nodes by the validator. Additionally, if it is unclear which dataset a column is coming from, the validator can derive and extend the node's source prefix. With the correct context and source prefix, the underlying source for any node can be easily identified.

Parsing, Validation, and Transformation Flow

For example, the following SQL statement has two sub-queries with the name users, but they have different exposed columns. For the outermost query, Calcite knows the users pointing to the with clause item users with user_id and user_age. The outermost query selection items user_age and created_at don’t have a prefix to tell where they are from, but Calcite extends them as users.user_age and orders.created_at during the validation process.

CREATE TABLE
 `db.schema.user_order` AS (
 WITH
   users AS (
   SELECT
     id AS user_id,
     age AS user_age
   FROM (
     SELECT
       *
     FROM
       `db.schema.users` ) AS users ),
   orders AS (
   SELECT
     user_id,
     created_at AS created,
     order_id AS id
   FROM
     `db.schema.order_items`
   WHERE
     status = 'Returned'
     AND created_at >= '2022-01-01' )
 SELECT
   created,
   user_age,
   COUNT(DISTINCT id) AS count_orders
 FROM
   users
 JOIN
   orders
 ON
   users.user_id = orders.user_id
 GROUP BY
   1,
   2 )

For brevity, we replace the concrete project and dataset name with db.schema and skip the prefix in the examples.

After the parsing and validation, we leverage the relational operator properties to transform the original SQL AST to another canonical form AST where all the expressions are directly based on the underlying physical table. For the above query, the outmost transformed query is shown as the following statement.

Parsing, validation, and transformation are essential parts of lineage generation. Parsing the SQL statement into an AST allows for efficient traversal to extract the desired information. Validation ensures each AST node has the correct meaning (where it's from, what it's used for, and the like). Without validation, nodes become difficult to discern between, especially for nodes with the same name and node type. This would lead to an inaccurate lineage, where nodes may point to the wrong node, or where some edges may not be ascertainable. Lastly, the transformation from the original AST to a canonical AST eliminates the effect of alias naming and nested query structures, basing the lineage on the underlying physical datasets.

CREATE TABLE
 `db.schema.user_order` AS (
SELECT
 `db.schema.order_items`.created_at AS created_at,
 `db.schema.users`.age AS user_age,
 COUNT( DISTINCT `db.schema.order_items`.order_id ) AS count_orders
FROM
 `db.schema.users`
JOIN
 `db.schema.order_items`
ON
 `db.schema.users`.id = `db.schema.order_items`.user_id
WHERE
 `db.schema.order_items`.status = 'Returned'
 AND `db.schema.order_items`.created_at >= '2022-01-01'
GROUP BY
 `db.schema.order_items`.created_at,
 `db.schema.users`.age)

Decomposing and Extraction

During the decomposition process, it breaks the transformed SQL AST into source, selection items, where clause, and group by. All these decomposed items are used later for lineage generation. It generates the following items:

Target Table

`db.schema.user_order`

From

`db.schema`.created_at,
`db.schema`.age,
COUNT( DISTINCT `db.schema.order_items`.order_id )

Selection

`db.schema`.created_at,
`db.schema`.age,
COUNT( DISTINCT `db.schema.order_items`.order_id )

Where

`db.schema`.status = 'Returned'
AND `db.schema.order_items`.created_at >= '2022-01-01'

Lineage Generation

Given the above generated items, we first derive the schema of the target table based on the validated AST together with the standardized type information from the metadata store.

Column Name

created_at

user_age

count_orders

Type

TIMESTAMP

INT

INT64

Lineage Generation Flow


After the schema deriving, it processes all the selection items and generates the lineage edges for the corresponding field in the target table.

Source Node

Target Node

Type

Context

order_items.created_at

user_order.created_at

source

`order_items.created_at`

users.age

user_order.user_age

source

`users.age`

order_items.order_id

user_order.count_orders

source

COUNT( DISTINCT `order_items.order_id` )

Finally, since the where clause affects the data in the target table, we process the where clause to derive the condition lineage. The condition lineage links the destination field with the where clause-related fields, so data operators can quickly recover the original SQL statement and understand how it affects the value of the destination field.  The following table demonstrates the condition lineages for the destination field created_at. The same lineage edges are generated for the other two target nodes too.

Source Node

Target Node

Type

Context

order_items.created_at

user_order.created_at

condition

SQL statement reference

order_items.status

user_order.created_at

condition

SQL statement reference

Entity Lineage

In Part 1, we talked about breaking down the SQL into data entities, views, and dimensions and demonstrated the relationship between those entities. The relation is derived by the same process described above. For the above example, if we import it to generate data entities, views, and dimensions, it will generate the following view, data entity, and dimensions.

Entity Type

Name

Expression

view

orders

SELECT
  *
FROM
  `db.schema.users` AS u
  JOIN `db.schema.order_items` AS oi
  ON u.id = oi.user_id

data entity

count_orders

COUNT(DISTINCT order_id)

dimension

user_age

age

dimension

status

status

dimension

time

created_at

During the importing process, we generate the view fields for the view to represent its schema. Since the above view selects all columns from those two joined tables, there is a view field for each column. We only list some relevant view field lineages used in the data entity and dimensions. To make it easy to understand, it directly lists the relevant names in the following table, while our system stores their IDs in our lineage store.

Source Node

Target Node

Type

Context

order_items.order_id

orders.order_id

source

`order_items.order_id`

order_items.created_at

orders.time

source

`order_items.created_at`

users.age

orders.user_age

source

`users.age`

users.status

orders.status

source

`users.age`

orders.order_id

count_orders

source

COUNT(DISTINCT order_id)

As shown above, the solid arrow represents the source lineage from the source field to the target field, while the dashed arrow implies that all dimensions from the view are available to the consumer when they consume the data entity with filters and group bys.

Lineage in Action: Detect Breaking Changes

Now, let's look at two quick examples of lineage in action, where it can be used to detect changes that may negatively impact downstream consumers.

Changes in Single Origin

Users are able to modify views, dimensions, and data entities in Single Origin, which can impact downstream consumers like reports, dashboards, or ML models. To prevent breaking changes and avoid the massive headaches that come with them, Single Origin leverages field-level lineage graphs to detect and monitor breaking changes automatically.

Changes in your Data Warehouse

When a dataset schema is changed (when a column is dropped, for example, or when a data type is changed) Single Origin can detect this and automatically check the updated field's lineage to determine if there is a downstream impact. Without this foresight, there can be unexpected consequences like incomplete or incorrect metrics for models and business consumers.

What’s next

We're excited to announce field-level lineage in Single Origin and look forward to developing the feature further to power new solutions for your organization. We can see a few potential use cases, including:

Policy Propagation

A policy applied to one entity can be propagated downstream to ensure policies between different entity types are inherited and consistent.

Automated Data Quality Debugging

Pinpoint exact upstream issues automatically when downstream usage has quality issues.

There are many more we can do with field-level lineage. Check back soon for our future updates. As always, feel free to reach out with any feedback or questions!

Engineering

Engineering

Engineering @ Single Origin