Skip to content

Possibly the fastest DataFrame-agnostic quality check library in town.

License

Notifications You must be signed in to change notification settings

StuffbyYuki/cuallee

 
 

Repository files navigation

cuallee

PyPI version ci codecov License status Code style: black

Meaning good in Aztec (Nahuatl), pronounced: QUAL-E

This library provides an intuitive API to describe checks initially just for PySpark dataframes v3.3.0. And extended to pandas, snowpark, duckdb, and more. It is a replacement written in pure python of the pydeequ framework.

I gave up in deequ as after extensive use, the API is not user-friendly, the Python Callback servers produce additional costs in our compute clusters, and the lack of support to the newest version of PySpark.

As result cuallee was born

This implementation goes in hand with the latest API from PySpark and uses the Observation API to collect metrics at the lower cost of computation. When benchmarking against pydeequ, cuallee uses circa <3k java classes underneath and remarkably less memory.

Support

cuallee is the data quality framework truly dataframe agnostic.

Provider API Versions
snowflake snowpark 1.11.1, 1.4.0
databricks pyspark & spark-connect 3.5.x, 3.4.0, 3.3.x, 3.2.x
bigquery bigquery 3.4.1
pandas pandas 2.0.2, 1.5.x, 1.4.x
duckdb duckdb 0.9.2,0.8.0, 0.7.1
polars polars 0.19.6

Logos are trademarks of their own brands.

Install

pip install cuallee

Checks

The most common checks for data integrity validations are completeness and uniqueness an example of this dimensions shown below:

from cuallee import Check, CheckLevel # WARN:0, ERR: 1

# Nulls on column Id
check = Check(CheckLevel.WARNING, "Completeness")
(
    check
    .is_complete("id")
    .is_unique("id")
    .validate(df)
).show() # Returns a pyspark.sql.DataFrame

Dates

Perhaps one of the most useful features of cuallee is its extensive number of checks for Date and Timestamp values. Including, validation of ranges, set operations like inclusion, or even a verification that confirms continuity on dates using the is_daily check function.

# Unique values on id
check = Check(CheckLevel.WARNING, "CheckIsBetweenDates")
df = spark.sql(
    """
    SELECT 
        explode(
            sequence(
                to_date('2022-01-01'), 
                to_date('2022-01-10'), 
                interval 1 day)) as date
    """)
assert (
    check.is_between("date", "2022-01-01", "2022-01-10")
    .validate(df)
    .first()
    .status == "PASS"
)

Membership

Other common test is the validation of list of values as part of the multiple integrity checks required for better quality data.

df = spark.createDataFrame([[1, 10], [2, 15], [3, 17]], ["ID", "value"])
check = Check(CheckLevel.WARNING, "is_contained_in_number_test")
check.is_contained_in("value", (10, 15, 20, 25)).validate(df)

Regular Expressions

When it comes to the flexibility of matching, regular expressions are always to the rescue. cuallee makes use of the regular expressions to validate that fields of type String conform to specific patterns.

df = spark.createDataFrame([[1, "is_blue"], [2, "has_hat"], [3, "is_smart"]], ["ID", "desc"])
check = Check(CheckLevel.WARNING, "has_pattern_test")
check.has_pattern("desc", r"^is.*t$") # only match is_smart 33% of rows.
check.validate(df).first().status == "FAIL"

Anomalies

Statistical tests are a great aid for verifying anomalies on data. Here an example that shows that will PASS only when 40% of data is inside the interquartile range

df = spark.range(10)
check = Check(CheckLevel.WARNING, "IQR_Test")
check.is_inside_interquartile_range("id", pct=0.4)
check.validate(df).first().status == "PASS"

+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|id |timestamp          |check|level  |column|rule                         |value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|1  |2022-10-19 00:09:39|IQR  |WARNING|id    |is_inside_interquartile_range|10000|10  |4         |0.6      |0.4           |PASS  |
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+

Workflows (Process Mining)

Besides the common citizen-like checks, cuallee offers out-of-the-box real-life checks. For example, suppose that you are working SalesForce or SAP environment. Very likely your business processes will be driven by a lifecycle:

  • Order-To-Cash
  • Request-To-Pay
  • Inventory-Logistics-Delivery
  • Others. In this scenario, cuallee offers the ability that the sequence of events registered over time, are according to a sequence of events, like the example below:
import pyspark.sql.functions as F
from cuallee import Check, CheckLevel

data = pd.DataFrame({
   "name":["herminio", "herminio", "virginie", "virginie"], 
   "event":["new","active", "new", "active"], 
   "date": ["2022-01-01", "2022-01-02", "2022-01-03", "2022-02-04"]}
   )
df = spark.createDataFrame(data).withColumn("date", F.to_date("date"))

# Cuallee Process Mining
# Testing that all edges on workflows
check = Check(CheckLevel.WARNING, "WorkflowViolations")

# Validate that 50% of data goes from new => active
check.has_workflow("name", "event", "date", [("new", "active")], pct=0.5)
check.validate(df).show(truncate=False)

+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|id |timestamp          |check             |level  |column                   |rule        |value               |rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|1  |2022-11-07 23:08:50|WorkflowViolations|WARNING|('name', 'event', 'date')|has_workflow|(('new', 'active'),)|4   |2.0       |0.5      |0.5           |PASS  |
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+

Controls

[2023-12-28]New feature! to simplify the entire validation of a dataframe in a particular dimension.

import pandas as pd
from cuallee import Control
df = pd.DataFrame({"X":[1,2,3], "Y": [10,20,30]})
# Checks all columns in dataframe for using is_complete check
Control.completeness(df)

cuallee VS pydeequ

In the test folder there are docker containers with the requirements to match the tests. Also a perftest.py available at the root folder for interests.

# 1000 rules / # of seconds

cuallee: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 162.00
pydeequ: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 322.00

Catalogue

Check Description DataType
is_complete Zero nulls agnostic
is_unique Zero duplicates agnostic
is_primary_key Zero duplicates agnostic
are_complete Zero nulls on group of columns agnostic
are_unique Composite primary key check agnostic
is_composite_key Zero duplicates on multiple columns agnostic
is_greater_than col > x numeric
is_positive col > 0 numeric
is_negative col < 0 numeric
is_greater_or_equal_than col >= x numeric
is_less_than col < x numeric
is_less_or_equal_than col <= x numeric
is_equal_than col == x numeric
is_contained_in col in [a, b, c, ...] agnostic
is_in Alias of is_contained_in agnostic
not_contained_in col not in [a, b, c, ...] agnostic
not_in Alias of not_contained_in agnostic
is_between a <= col <= b numeric, date
has_pattern Matching a pattern defined as a regex string
is_legit String not null & not empty ^\S$ string
has_min min(col) == x numeric
has_max max(col) == x numeric
has_std σ(col) == x numeric
has_mean μ(col) == x numeric
has_sum Σ(col) == x numeric
has_percentile %(col) == x numeric
has_cardinality count(distinct(col)) == x agnostic
has_infogain count(distinct(col)) > 1 agnostic
has_max_by A utilitary predicate for max(col_a) == x for max(col_b) agnostic
has_min_by A utilitary predicate for min(col_a) == x for min(col_b) agnostic
has_correlation Finds correlation between 0..1 on corr(col_a, col_b) numeric
has_entropy Calculates the entropy of a column entropy(col) == x for classification problems numeric
is_inside_interquartile_range Verifies column values reside inside limits of interquartile range Q1 <= col <= Q3 used on anomalies. numeric
is_in_millions col >= 1e6 numeric
is_in_billions col >= 1e9 numeric
is_t_minus_1 For date fields confirms 1 day ago t-1 date
is_t_minus_2 For date fields confirms 2 days ago t-2 date
is_t_minus_3 For date fields confirms 3 days ago t-3 date
is_t_minus_n For date fields confirms n days ago t-n date
is_today For date fields confirms day is current date t-0 date
is_yesterday For date fields confirms 1 day ago t-1 date
is_on_weekday For date fields confirms day is between Mon-Fri date
is_on_weekend For date fields confirms day is between Sat-Sun date
is_on_monday For date fields confirms day is Mon date
is_on_tuesday For date fields confirms day is Tue date
is_on_wednesday For date fields confirms day is Wed date
is_on_thursday For date fields confirms day is Thu date
is_on_friday For date fields confirms day is Fri date
is_on_saturday For date fields confirms day is Sat date
is_on_sunday For date fields confirms day is Sun date
is_on_schedule For date fields confirms time windows i.e. 9:00 - 17:00 timestamp
is_daily Can verify daily continuity on date fields by default. [2,3,4,5,6] which represents Mon-Fri in PySpark. However new schedules can be used for custom date continuity date
has_workflow Adjacency matrix validation on 3-column graph, based on group, event, order columns. agnostic
satisfies An open SQL expression builder to construct custom checks agnostic
validate The ultimate transformation of a check with a dataframe input for validation agnostic

Controls pyspark

Check Description DataType
completeness Zero nulls agnostic
information Zero nulls and cardinality > 1 agnostic
intelligence Zero nulls, zero empty strings and cardinality > 1 agnostic
percentage_fill % rows not empty agnostic
percentage_empty % rows empty agnostic

ISO Standard

A new module has been incorporated in cuallee==0.4.0 which allows the verification of International Standard Organization columns in data frames. Simply access the check.iso interface to add the set of checks as shown below.

Check Description DataType
iso_4217 currency compliant ccy string
iso_3166 country compliant country string
df = spark.createDataFrame([[1, "USD"], [2, "MXN"], [3, "CAD"], [4, "EUR"], [5, "CHF"]], ["id", "ccy"])
check = Check(CheckLevel.WARNING, "ISO Compliant")
check.iso.iso_4217("ccy")
check.validate(df).show()
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
| id|          timestamp|        check|  level|column|           rule|               value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
|  1|2023-05-14 18:28:02|ISO Compliant|WARNING|   ccy|is_contained_in|{'BHD', 'CRC', 'M...|   5|       0.0|      1.0|           1.0|  PASS|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+

Snowflake Connection

In order to establish a connection to your SnowFlake account cuallee relies in the following environment variables to be avaialble in your environment:

  • SF_ACCOUNT
  • SF_USER
  • SF_PASSWORD
  • SF_ROLE
  • SF_WAREHOUSE
  • SF_DATABASE
  • SF_SCHEMA

Spark Connect

Just add the environment variable SPARK_REMOTE to your remote session, then cuallee will connect using

spark_connect = SparkSession.builder.remote(os.getenv("SPARK_REMOTE")).getOrCreate()

and convert all checks to select as opposed to Observation API compute instructions.

Databricks Connection

By default cuallee will search for a SparkSession available in the globals so there is literally no need to SparkSession.builder. When working in a local environment it will automatically search for an available session, or start one.

DuckDB

For testing on duckdb simply pass your table name to your check et voilà

import duckdb
conn = duckdb.connect(":memory:")
check = Check(CheckLevel.WARNING, "DuckDB", table_name="temp/taxi/*.parquet")
check.is_complete("VendorID")
check.is_complete("tpep_pickup_datetime")
check.validate(conn)

   id            timestamp check    level                column         rule value      rows  violations  pass_rate  pass_threshold status
0   1  2022-10-31 23:15:06  test  WARNING              VendorID  is_complete   N/A  19817583         0.0        1.0             1.0   PASS
1   2  2022-10-31 23:15:06  test  WARNING  tpep_pickup_datetime  is_complete   N/A  19817583         0.0        1.0             1.0   PASS

Roadmap

100% data frame agnostic implementation of data quality checks. Define once, run everywhere

  • [x] PySpark 3.5.0
  • [x] PySpark 3.4.0
  • [x] PySpark 3.3.0
  • [x] PySpark 3.2.x
  • [x] Snowpark DataFrame
  • [x] Pandas DataFrame
  • [x] DuckDB Tables
  • [x] BigQuery Client
  • [x] Polars DataFrame
  • [*] Dagster Integration
  • [x] Spark Connect
  • [-] PDF Report
  • Metadata check
  • Help us in a discussion?

Whilst expanding the functionality feels a bit as an overkill because you most likely can connect spark via its drivers to whatever DBMS of your choice. In the desire to make it even more user-friendly we are aiming to make cuallee portable to all the providers above.

Authors

License

Apache License 2.0 Free for commercial use, modification, distribution, patent use, private use. Just preserve the copyright and license.

Made with ❤️ in Utrecht 🇳🇱
Maintained over ⌛ from Ljubljana 🇸🇮

About

Possibly the fastest DataFrame-agnostic quality check library in town.

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 98.1%
  • TeX 1.3%
  • Other 0.6%