rramos.github.io

08 Jun, 2024 - About 4 minutes

OpenLineage

Intro

In this article I will go through OpenLineage specification.

API

The specification for OpenLineage is formalized as a JsonSchema OpenLineage.json. An OpenAPI spec is also provided for HTTP-based implementations: OpenLineage.yml

Documentation

Documentation is available at: https://openlineage.github.io

Model

OpenLineage Model Diagram

  • Run Event: an event describing an observed state of a job run. Sending at least a START event and a COMPLETE/FAIL/ABORT event is required. Additional events are optional.

  • Job: a process definition that consumes and produces datasets (defined as its inputs and outputs). It is identified by a unique name within a namespace.

  • Dataset: an abstract representation of data. It has a unique name within the datasource namespace derived from its physical location (db.host.database.schema.table, for example)

  • Run: An instance of a running job with a start and completion (or failure) time. A run is identified by a globally unique ID relative to its job definition. A run ID must be a UUID.

  • Facet: A piece of metadata attached to one of the entities defined above

Example

Example: Here is an example of a simple start run event not adding any facet information

{
"eventType": "START",
"eventTime": "2020-12-09T23:37:31.081Z",
"run": {
"runId": "3b452093-782c-4ef2-9c0c-aafe2aa6f34d",
},
"job": {
"namespace": "my-scheduler-namespace",
"name": "myjob.mytask",
},
"inputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
}
],
"outputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.output_table",
}
],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/RunEvent"
}

Lifecycle

The OpenLineage API defines events to capture the lifecycle of a Run for a given Job. When a job is being run, we capture metadata by sending run events when the state of the job transitions to a different state. We might observe different aspects of the job run at different stages. This means that different metadata might be collected in each event during the lifecycle of a run. All metadata is additive.

Facets

Facets are pieces of metadata that can be attached to the core entities:

  • Run
  • Job
  • Dataset (Inputs or Outputs)

A facet is an atomic piece of metadata identified by its name. This means that emitting a new facet with the same name for the same entity replaces the previous facet instance for that entity entirely

Standard Facets

Run Facets

  • nominalTime: Captures the time this run is scheduled for. This is a typical usage for a time-based scheduled job. The job has a nominal schedule time that will be different from the actual time at which it is running.

  • parent: Captures the parent job and run when the run has been spawned from a parent run. For example, in the case of Airflow, there is often a run for a DAG that then spawns runs for individual tasks that refer to the parent run as the DAG run. Similarly, when a SparkOperator starts a Spark job, this creates a separate run that refers to the task run as its parent.

  • errorMessage: Captures the error message, programming language - and optionally stack trace - when a run fails.

Job Facets

  • sourceCodeLocation: Captures the source code location and version (e.g., git sha) of the job.

  • sourceCode: Captures the language (e.g., Python) and actual source code of the job.

  • sql: Capture the SQL query if the job is a SQL query.

  • ownership: Captures the owners of the job

Dataset Facets

  • schema: Captures the schema of the dataset

  • dataSource: Captures the Database instance containing the dataset (e.g., Database schema, Object store bucket, etc.)

  • lifecycleStateChange: Captures the lifecycle states of the dataset (alter, create, drop, overwrite, rename, truncate, etc.).

  • version: Captures the dataset version when versioning is defined by the database (e.g., Iceberg snapshot ID)

  • columnLineage: Captures the column-level lineage

  • ownership: Captures the owners of the dataset

Input Dataset Facets

  • dataQualityMetrics: Captures dataset-level and column-level data quality metrics when scanning a dataset with a DataQuality library (row count, byte size, null count, distinct count, average, min, max, quantiles).

  • dataQualityAssertions: Captures the result of running data tests on a dataset or its columns.

Output Dataset Facets

  • outputStatistics: Captures the size of the output written to a dataset (row count and byte size).

Custom Facets

Naming of custom facets should follow the pattern {prefix}{name}{entity}Facet PascalCased.
The prefix must be a distinct identifier named after the project defining them to avoid collision with standard facets defined in the OpenLineage.json spec.

Test

There are several tools now complying with this specification.
If you want to test with Marquez follow the previous article.

References

OLDER > < NEWER