rramos.github.io

23 Dec, 2022 - About 5 minutes

SODA

Intro

Bernard Denys kindly share with me the availability of their product for the Open Source Community in Github. SODA provide a Data Monitoring Platform. In this article i will take some time to explore a quick setup on how to use it and final comments around it.

What does Soda SQL do

Soda SQL allows you to

Stop your pipeline when bad data is detected
Extract metrics and column profiles through super efficient SQL
Full control over metrics and queries through declarative config files

5m tutorial

There is a 5m tutorial available on their official page let’s have a go…

Install

I’m using Linux and i’m lazy so i’ve just installed the required pip

pip3 install soda-sql

Let’s check if it is working

$ soda 
Usage: soda [OPTIONS] COMMAND [ARGS]...

Soda CLI version 2.0.0b15

Options:
--help Show this message and exit.

Commands:
analyze Analyzes tables in the warehouse and creates scan YAML files...
create Creates a new warehouse.yml file and prepares credentials in
your...

init Renamed to `soda analyze`
scan Computes all measurements and runs all tests on one table.

Great now let’s create some a dummy warehouse and do some tests:

Create dummy Datawarehouse

docker run --name soda_sql_tutorial_db --rm -d \
-p 5432:5432 \
-v soda_sql_tutorial_postgres:/var/lib/postgresql/data:rw \
-e POSTGRES_USER=sodasql \
-e POSTGRES_DB=sodasql \
-e POSTGRES_HOST_AUTH_METHOD=trust \
postgres:9.6.17-alpine

And load it with data with the following command

docker exec soda_sql_tutorial_db \
sh -c "wget -qO - https://raw.githubusercontent.com/sodadata/soda-sql/main/tests/demo/demodata.sql | psql -U sodasql -d sodasql"

According to the tutorial one can remove the created container and volume with the following command :

docker stop soda_sql_tutorial_db
docker volume rm soda_sql_tutorial_postgres

Create warehouse directory

mkdir soda_sql_tutorial
cd soda_sql_tutorial/
soda create -d sodasql -u sodasql -w soda_sql_tutorial postgres

This created the warehouse.yml with connection settings which are stored on your homedir ~/.soda/env_vars.yml

Analyse table scan YAML files

The following command will analyze the exiting tables and fill the ./tables/ directory with large data warehouse it can be inputted manually.

Well, i only have 5m so let’s give it a go.

soda analyze

Hum! interesting, queries the information schema and generates a file which i will assume per table with several metrics for validation.

table_name: demodata
metrics:
- row_count
- missing_count
- missing_percentage
- values_count
- values_percentage
- valid_count
- valid_percentage
- invalid_count
- invalid_percentage
- min_length
- max_length
- avg_length
- min
- max
- avg
- sum
- variance
- stddev
tests:
- row_count > 0
columns:
id:
valid_format: uuid
tests:
- invalid_percentage == 0
feepct:
valid_format: number_percentage
tests:
- invalid_percentage == 0

Run a scan

Each scan requires a warehouse YAML and a scan YAML as input. The scan command will collect the configured metrics and run the defined tests against them.

soda scan warehouse.yml tables/demodata.yml

And that’s it for the tutorial

One can then integrate this with a orchestration tool such as Airflow

It is recommend to PythonVirtualenvOperator to prevent any dependency conflicts.

Example dag:

from airflow import DAG
from airflow.models.variable import Variable
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


# Make sure that this variable is set in your Airflow
warehouse_yml = Variable.get('soda_sql_warehouse_yml_path')
scan_yml = Variable.get('soda_sql_scan_yml_path')

default_args = {
'owner': 'soda_sql',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}


def run_soda_scan(warehouse_yml_file, scan_yml_file):
from sodasql.scan.scan_builder import ScanBuilder
scan_builder = ScanBuilder()
# Optionally you can directly build the Warehouse dict from Airflow secrets/variables
# and set scan_builder.warehouse_dict with values.
scan_builder.warehouse_yml_file = warehouse_yml_file
scan_builder.scan_yml_file = scan_yml_file
scan = scan_builder.build()
scan_result = scan.execute()
if scan_result.has_failures():
failures = scan_result.failures_count()
raise ValueError(f"Soda Scan found {failures} errors in your data!")


dag = DAG(
'soda_sql_python_venv_op',
default_args=default_args,
description='A simple Soda SQL scan DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)

ingest_data_op = DummyOperator(
task_id='ingest_data'
)

soda_sql_scan_op = PythonVirtualenvOperator(
task_id='soda_sql_scan_demodata',
python_callable=run_soda_scan,
requirements=["soda-sql==2.0.0b10"],
system_site_packages=False,
op_kwargs={'warehouse_yml_file': warehouse_yml,
'scan_yml_file': scan_yml},
dag=dag
)

publish_data_op = DummyOperator(
task_id='publish_data'
)

ingest_data_op >> soda_sql_scan_op >> publish_data_op

There is also some work being done to support a SodaScanOperator for Airflow.

Warehouse types

At the moment SODA is supporting the following techs:

  • Snowflake
  • AWS Athena
  • GCP BigQuery
  • PostgreSQL
  • Redshift
  • Spark SQL -> (Coming Soon)

Conclusion

This quick tutorial make’s it easy to test the tool. But i would like to extended the tests on a larger solution to check how it behaves. Also i would like to see the SparkSQL as a warehouse option as most of my teams work are centered on that technology.

Topics to do extended testing with:

  • Schema evolution impact on the quality test setup
  • Not using a Warehouse solution but a Datalake
  • Providing schemas from a Datalake and using a processing engine on top of it (ex: spark.schemas and Parquet files)
  • Testing with Delta tables
  • Define levels of data-quality analysis
  • Understanding better how the scans works

The tool seems to have good potential and seems very simple which would speed the adoption. Also by sharing this to the open source community seems to be as a good choice to increase user quorum and still offer enterprise services solution.

References

OLDER > < NEWER