Note: You can run this from your computer (Jupyter or terminal), or use one of the hosted options:

binder-logo

deepnote-logo

Pipeline testing

Testing your pipeline is critical to ensure your data expectations hold. When you perform a data transformation, you are expecting the output to have certain properties (e.g. no nulls in certain column). Without testing, these expectations won’t be verified and will cause errors errors to propagate to all downstream tasks.

These are the most common sources of errors when transforming data:

  1. A join operation generates duplicated entries because a wrong assumption of a one-to-one relationship (which is really a one-to-many) in the source tables

  2. A function that aggregates data returns NULL because at least one of the input data points was NULL

  3. Dirty data points are used in the analysis (e.g. in a column age, you forgot to remove corrupted data points with negative values)

Some of these errors are easy to spot (2), but it might take you some tome to find out about others (1 and 3), or worst, you will never notice these errors and just use incorrect data in your analysis. And even if your code is correct and all your expectations hold true, it might not hold true in the future if the data changes and it’s important for you to know this as soon as it happens.

To make testing effective, your tests should run every time you run your tasks. Ploomber has a mechanism to automate this.

Sample data

This example loads data from a single table called my_table, which has two columns:

  1. age: ranges from 21 to 80 but there are some corrupted records with -42

  2. score: ranges from 0 to 10 but there are some corrupted records with missing values

Let’s take a look at our example pipeline.yaml:

[1]:
from pathlib import Path
from ploomberutils import display_file
[2]:
display_file('pipeline.yaml')
clients:
  SQLScript: db.get_client
  SQLDump: db.get_client

tasks:
  - source: clean.sql
    name: clean
    product: ['my_clean_table', 'table']
    on_finish: integration_tests.test_sql_clean

  - source: dump.sql
    name: dump
    class: SQLDump
    product: output/my_clean_table.csv
    chunksize: null

  - source: transform.py
    product:
        nb: output/transformed.html
        data: output/transformed.csv
    on_finish: integration_tests.test_py_transform

The pipeline has three tasks, one to clean the raw table, another one to dump the clean data to a CSV file and finally, one Python task to transform the data. We included a SQL and a Python task to show how you can test both types of tasks but we recommend you to do as much analysis as you can using SQL because it scales much better than Python code (you won’t have to deal with memory errors).

The configuration is straightforward, the only new key is on_finish (inside the first and third task). This is known as a hook. Task hooks allow you to embed custom logic when certain events happen. on_finish is executed after a task successfully executes. The value is a dotted path, which tells Ploomber where to find your testing function. Under the hood, Ploomber will import your function and call it after the task is executed, here’s some equivalent code:

from integration_tests import test_sql_clean

# your task is executed...

# ploomber calls your testing function...
test_sql_clean()

Before diving into the testing source code, let’s see the rest of the tasks.

clean.sql just filters columns we don’t want to include in the analysis:

[3]:
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0

dump.sql just selects all rows from the clean table to dump it to the CSV file:

[4]:
display_file('dump.sql')
SELECT * FROM {{upstream['clean']}}

Finally, the transform.py script generates a new column using score

[5]:
display_file('transform.py')
import pandas as pd

# + tags=["parameters"]
upstream = ['dump']
product = None

# +
df = pd.read_csv(upstream['dump'])
df['multiplied_score'] = df.score * 42

# +
df.to_csv(product['data'])

Let’s now take a look at our tests:

[6]:
display_file('integration_tests.py')
import pandas as pd
from ploomber.testing.sql import nulls_in_columns, range_in_column


def test_sql_clean(client, product):
    """Tests for clean.sql
    """
    assert not nulls_in_columns(client, ['score', 'age'], product)
    min_age, max_age = range_in_column(client, 'age', product)
    assert min_age > 0


def test_py_transform(product):
    """Tests for transform.py
    """
    df = pd.read_csv(str(product['data']))
    assert not df.multiplied_score.isna().sum()
    assert df.multiplied_score.min() >= 0

Testing Python scripts

To test your Python scripts, you have to know which file to look at. You can do so by simply adding product as argument to your function. If your Python script generates more than one product (like in our case), product will be a dictionary-like object, that’s why we are using product['data']. This returns a Product object, to get the path to the file, simply use the str function.

>>> product # dictionary-like object: maps names to Product objects
>>> product['data'] # Product object
>>> str(product['data']) # path to the data file

Testing SQL scripts

To test SQL scripts, you also need the client to send queries to the appropriate database, to do so, just add client to your testing function.

The ploomber.testing.sql module implements convenient functions to test your tables. They always take client as its first argument, just pass the client variable directly. Since our SQL script only generates a product, you can directly pass the product object to the testing function (otherwise pass product[key]) with the appropriate key.

Note: If you’re implementing your own SQL testing logic, doing str(product) will return a {schema}.{name} string, you can also use product.schema and product.name.

Running the pipeline

Before we run the pipeline, we generate a sample database:

[7]:
%%sh
%%bash
cd setup
python script.py
sh: 1: %%bash: not found

Let’s now run our pipeline:

[8]:
%%sh
ploomber build
name       Ran?      Elapsed (s)    Percentage
---------  ------  -------------  ------------
clean      True         0.01109      0.762923
dump       True         0.000934     0.0642534
transform  True         1.4416      99.1728
Building task 'transform':   0%|          | 0/3 [00:00<?, ?it/s]
Executing:   0%|          | 0/5 [00:00<?, ?cell/s]
Executing: 100%|██████████| 5/5 [00:01<00:00,  3.84cell/s]
Building task 'transform': 100%|██████████| 3/3 [00:01<00:00,  2.05it/s]

Everything looks good.

Let’s now imagine a colleague found an error in the cleaning logic and has re-written the script. However, he was unaware that both columns in the raw table had corrupted data and forgot to include the filtering conditions.

The script now looks like this:

[9]:
path = Path('clean.sql')
new_code = path.read_text().replace('WHERE score is not null AND age > 0', '')
path.write_text(new_code)
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table

Let’s see what happens if we run the pipeline:

[10]:
%%capture captured
%%sh --no-raise-error
ploomber build
[11]:
print(captured.stderr)
Building task 'clean': 100%|██████████| 3/3 [00:00<00:00, 201.94it/s]
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/cli/io.py", line 20, in wrapper
    fn(**kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/cli/build.py", line 51, in main
    report = dag.build(force=args.force, debug=args.debug)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/dag/dag.py", line 470, in build
    report = callable_()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/dag/dag.py", line 569, in _build
    raise build_exception
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/dag/dag.py", line 501, in _build
    task_reports = self._executor(dag=self,
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 138, in __call__
    raise DAGBuildError(str(exceptions_all))
ploomber.exceptions.DAGBuildError:
=============================== DAG build failed ===============================
--------- SQLScript: clean -> SQLRelation(('my_clean_table', 'table')) ---------
- /home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/projects-master/testing/clean.sql -
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/tasks/abc.py", line 591, in _build
    self._post_run_actions()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/tasks/abc.py", line 342, in _post_run_actions
    self._run_on_finish()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/tasks/abc.py", line 333, in _run_on_finish
    self.on_finish(**kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/util/dotted_path.py", line 54, in __call__
    return self._callable(*args, **kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/projects-master/testing/integration_tests.py", line 8, in test_sql_clean
    assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 186, in catch_exceptions
    fn()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 159, in __call__
    return self.fn(**self.kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 166, in catch_warnings
    result = fn()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 159, in __call__
    return self.fn(**self.kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/executors/serial.py", line 235, in build_in_subprocess
    report, meta = task._build(**build_kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/src/ploomber/tasks/abc.py", line 603, in _build
    raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Exception when running on_finish for task "clean":
=============================== Summary (1 task) ===============================
SQLScript: clean -> SQLRelation(('my_clean_table', 'table'))
=============================== DAG build failed ===============================



Ploomber a structured error message to understand why your pipeline failed. The last few lines are a summary:

=============================== Summary (1 task) ===============================
SQLScript: clean -> SQLRelation(('my_clean_table', 'table'))
=============================== DAG build failed ===============================

By looking at the summary we know our pipeline failed because one task crashed (clean). If we scroll up we’ll see a header section:

--------- SQLScript: clean -> SQLRelation(('my_clean_table', 'table')) ---------
-------------- /Users/Edu/dev/projects-ploomber/testing/clean.sql --------------

Each task displays its traceback on a separate section. Since only one task failed in our example we only see one task traceback.

At the end of this task traceback, we see the following line:

Exception when running on_finish for task "clean":

Now we know that the on_finish hook crashed. If we go up a few lines up:

assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError

That tells me the exact test that failed! Pipelines can get very large; it helps a lot to have a structured error message that tells us what failed and where it happened. Our take away from the error message is: “the pipeline building process failed because the on_finish hook in the clean task raised an exception in certain assertion”. That’s much better than either “the pipeline failed” or “this line raised an exception”.

Let’s fix our pipeline and add the WHERE clause again:

[12]:
path = Path('clean.sql')
new_code = path.read_text() + 'WHERE score is not null AND age > 0'
path.write_text(new_code)
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0
[13]:
%%sh
ploomber build
name       Ran?      Elapsed (s)    Percentage
---------  ------  -------------  ------------
clean      True         0.012128     0.789936
dump       True         0.000837     0.0545165
transform  True         1.52235     99.1555
Building task 'transform':   0%|          | 0/3 [00:00<?, ?it/s]
Executing:   0%|          | 0/5 [00:00<?, ?cell/s]
Executing:  20%|██        | 1/5 [00:01<00:04,  1.05s/cell]
Executing: 100%|██████████| 5/5 [00:01<00:00,  3.62cell/s]
Building task 'transform': 100%|██████████| 3/3 [00:01<00:00,  1.94it/s]

All good! Pipeline is running without issues again!

Test-driven development (TDD)

Writing data tests is essential for developing robust pipelines. Coding tests is simple, all we have to do is write in code that we already have in our mind when thinking what the outcome of a script should be.

This thought process happens before we write the actual code, which means we could easily write tests even before we write the actual code. This approach is called Test-driven development (TDD).

Following this framework has an added benefit, since we force ourselves to put in concrete terms our data expectations, it makes easier to think how we want to get there.

Furthermore, tests also serve as documentation for us (and for others). By looking at our tests, anyone can see what our intent is. Then by looking at the code, it will be easier to spot mismatches between our intent and our implementation.

Pro tip: debugging and developing tests interactively

Even though tests are usually just a few short statements, writing them in an interactive way can help you quickly prototype your assertions. One simple trick you can use to do this is to start an interactive session and load the client and product variables.

Let’s imagine you want to write a test for a new SQL script (but the same applies for other types of scripts). You add a testing function, but it’s currently empty:

def my_sql_testing_function(client, product):
    pass

If you run this, Ploomber will still call your function, you can start an interactive session when this happens:

def my_sql_testing_function(client, product):
    from IPython import embed; embed()

Once you call ploomber build, wait for the Python prompt to show and verify you have the client and product variables:

>>> print(client)
>>> print(product)