Note: To see the source code of this page, click here. Or here to launch an interactive demo.

Pipeline testing

Testing your pipeline is critical to ensure your data expectations hold. When you perform a data transformation, you are expecting the output to have certain properties (e.g. no nulls in certain column). Without testing, these expectations won’t be verified and will cause errors errors to propagate to all downstream tasks.

These are the most common sources of errors when transforming data:

  1. A join operation generates duplicated entries because a wrong assumption of a one-to-one relationship (which is really a one-to-many) in the source tables

  2. A function that aggregates data returns NULL because at least one of the input data points was NULL

  3. Dirty data points are used in the analysis (e.g. in a column age, you forgot to remove corrupted data points with negative values)

Some of these errors are easy to spot (2), but it might take you some tome to find out about others (1 and 3), or worst, you will never notice these errors and just use incorrect data in your analysis. And even if your code is correct and all your expectations hold true, it might not hold true in the future if the data changes and it’s important for you to know this as soon as it happens.

To make testing effective, your tests should run every time you run your tasks. Ploomber has a mechanism to automate this.

Sample data

This example loads data from a single table called my_table, which has two columns:

  1. age: ranges from 21 to 80 but there are some corrupted records with -42

  2. score: ranges from 0 to 10 but there are some corrupted records with missing values

Let’s take a look at our example pipeline.yaml:

[1]:
from pathlib import Path
from ploomberutils import display_file
[2]:
display_file('pipeline.yaml')
meta:
  extract_product: False

clients:
  SQLScript: db.get_client
  SQLDump: db.get_client

tasks:
  - source: clean.sql
    name: clean
    product: ['my_clean_table', 'table']
    on_finish: integration_tests.test_sql_clean

  - source: dump.sql
    name: dump
    class: SQLDump
    product: output/my_clean_table.csv
    chunksize: null

  - source: transform.py
    product:
        nb: output/transformed.html
        data: output/transformed.csv
    on_finish: integration_tests.test_py_transform

The pipeline has three tasks, one to clean the raw table, another one to dump the clean data to a CSV file and finally, one Python task to transform the data. We included a SQL and a Python task to show how you can test both types of tasks but we recommend you to do as much analysis as you can using SQL because it scales much better than Python code (you won’t have to deal with memory errors).

The configuration is straightforward, the only new key is on_finish (inside the first and third task). This is known as a hook. Task hooks allow you to embed custom logic when certain events happen. on_finish is executed after a task successfully executes. The value is a dotted path, which tells Ploomber where to find your testing function. Under the hood, Ploomber will import your function and call it after the task is executed, here’s some equivalent code:

from integration_tests import test_sql_clean

# your task is executed...

# ploomber calls your testing function...
test_sql_clean()

Before diving into the testing source code, let’s see the rest of the tasks.

clean.sql just filters columns we don’t want to include in the analysis:

[3]:
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0

dump.sql just selects all rows from the clean table to dump it to the CSV file:

[4]:
display_file('dump.sql')
SELECT * FROM {{upstream['clean']}}

Finally, the transform.py script generates a new column using score

[5]:
display_file('transform.py')
import pandas as pd

# + tags=["parameters"]
upstream = ['dump']
product = None

# +
df = pd.read_csv(upstream['dump'])
df['multiplied_score'] = df.score * 42

# +
df.to_csv(product['data'])

Let’s now take a look at our tests:

[6]:
display_file('integration_tests.py')
import pandas as pd
from ploomber.testing.sql import nulls_in_columns, range_in_column


def test_sql_clean(client, product):
    """Tests for clean.sql
    """
    assert not nulls_in_columns(client, ['score', 'age'], product)
    min_age, max_age = range_in_column(client, 'age', product)
    assert min_age > 0


def test_py_transform(product):
    """Tests for transform.py
    """
    df = pd.read_csv(str(product['data']))
    assert not df.multiplied_score.isna().sum()
    assert df.multiplied_score.min() >= 0

Testing Python scripts

To test your Python scripts, you have to know which file to look at. You can do so by simply adding product as argument to your function. If your Python script generates more than one product (like in our case), product will be a dictionary-like object, that’s why we are using product['data']. This returns a Product object, to get the path to the file, simply use the str function.

>>> product # dictionary-like object: maps names to Product objects
>>> product['data'] # Product object
>>> str(product['data']) # path to the data file

Testing SQL scripts

To test SQL scripts, you also need the client to send queries to the appropriate database, to do so, just add client to your testing function.

The ploomber.testing.sql module implements convenient functions to test your tables. They always take client as its first argument, just pass the client variable directly. Since our SQL script only generates a product, you can directly pass the product object to the testing function (otherwise pass product[key]) with the appropriate key.

Note: If you’re implementing your own SQL testing logic, doing str(product) will return a {schema}.{name} string, you can also use product.schema and product.name.

Running the pipeline

Let’s now run our pipeline:

[7]:
%%sh
ploomber build
name          Ran?      Elapsed (s)    Percentage
------------  ------  -------------  ------------
clean         True         0.136039      8.35476
dump          True         0.002318      0.142359
transform.py  True         1.48992      91.5029
100%|██████████| 3/3 [00:00<00:00, 9907.80it/s]
Building task "transform.py":  33%|███▎      | 1/3 [00:00<00:00,  7.07it/s]
Executing:   0%|          | 0/5 [00:00<?, ?cell/s]
Executing: 100%|██████████| 5/5 [00:01<00:00,  3.87cell/s]
Building task "transform.py": 100%|██████████| 3/3 [00:01<00:00,  1.83it/s]

Everything looks good.

Let’s now imagine a colleague found an error in the cleaning logic and has re-written the script. However, he was unaware that both columns in the raw table had corrupted data and forgot to include the filtering conditions.

The script now looks like this:

[8]:
path = Path('clean.sql')
new_code = path.read_text().replace('WHERE score is not null AND age > 0', '')
path.write_text(new_code)
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table

Let’s see what happens if we run the pipeline (don’t be intimidated by the long traceback, we’ll explain it in a bit):

[9]:
%%capture captured
%%sh --no-raise-error
ploomber build
[10]:
print(captured.stderr)
100%|██████████| 3/3 [00:00<00:00, 9954.84it/s]
Building task "clean": 100%|██████████| 3/3 [00:00<00:00, 10.55it/s]
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/dag/DAG.py", line 442, in _build
    task_reports = self._executor(dag=self,
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 134, in __call__
    raise DAGBuildError('DAG build failed, the following '
ploomber.exceptions.DAGBuildError: DAG build failed, the following tasks crashed (corresponding downstream tasks aborted execution):
* SQLScript: clean -> SQLRelation(my_clean_table)
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/tasks/Task.py", line 290, in _run_on_finish
    self.on_finish(**kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/checkouts/stable/doc/projects-tmp/projects-master/testing/integration_tests.py", line 8, in test_sql_clean
    assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/tasks/Task.py", line 483, in _build
    self._run_on_finish()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/tasks/Task.py", line 298, in _run_on_finish
    raise type(e)(msg) from e
AssertionError: Exception when running on_finish for task "clean":

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 180, in catch_exceptions
    fn()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 159, in __call__
    return self.fn(**self.kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 164, in catch_warnings
    result = fn()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 159, in __call__
    return self.fn(**self.kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/executors/serial.py", line 227, in build_in_subprocess
    report, meta = task._build(**build_kwargs)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/tasks/Task.py", line 495, in _build
    raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Exception when running on_finish for task "clean": Exception when running on_finish for task "clean":


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/bin/ploomber", line 8, in <module>
    sys.exit(cmd_router())
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/cli/cli.py", line 173, in cmd_router
    fn()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/cli/build.py", line 36, in main
    report = dag.build(force=args.force, debug=args.debug)
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/dag/DAG.py", line 414, in build
    report = callable_()
  File "/home/docs/checkouts/readthedocs.org/user_builds/ploomber/conda/stable/lib/python3.8/site-packages/ploomber/dag/DAG.py", line 513, in _build
    raise DAGBuildError(
ploomber.exceptions.DAGBuildError: Failed to build DAG DAG("No name")

Ploomber error messages are designed to give you enough context, so you can fix things quickly.

The last line says that our pipeline failed to build:

ploomber.exceptions.DAGBuildError: Failed to build DAG

That’s a very general error message, but it tells us at which stage our pipeline failed (building is not the only one). If you go up a few lines, you’ll see this:

ploomber.exceptions.TaskBuildError: Exception when running on_finish for task "clean"

That’s a bit more specific. It’s pointing us to the on_finish hook in the clean task. Go up a few more lines:

assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError

That tells me the exact test that failed! While having this long error messages might seem to verbose, it helps a lot to understand why the pipeline failed, our take away from the error message is: “the pipeline building process failed because the on_finish hook in the clean task raised an exception in this line”. That’s much better than either “the pipeline failed” or “this line raised an exception”.

Let’s fix our pipeline and add the WHERE clause again:

[11]:
path = Path('clean.sql')
new_code = path.read_text() + 'WHERE score is not null AND age > 0'
path.write_text(new_code)
display_file('clean.sql')
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0
[12]:
%%sh
ploomber build
name          Ran?      Elapsed (s)    Percentage
------------  ------  -------------  ------------
clean         True         2.78918     65.3999
dump          True         0.002547     0.0597213
transform.py  True         1.47308     34.5404
100%|██████████| 3/3 [00:00<00:00, 12421.43it/s]
Building task "transform.py":  33%|███▎      | 1/3 [00:02<00:05,  2.80s/it]
Executing:   0%|          | 0/5 [00:00<?, ?cell/s]
Executing: 100%|██████████| 5/5 [00:01<00:00,  3.91cell/s]
Building task "transform.py": 100%|██████████| 3/3 [00:04<00:00,  1.43s/it]

All good! Pipeline is running without issues again!

Test-driven development (TDD)

Writing data tests is essential for developing robust pipelines. Coding tests is simple, all we have to do is write in code that we already have in our mind when thinking what the outcome of a script should be.

This thought process happens before we write the actual code, which means we could easily write tests even before we write the actual code. This approach is called Test-driven development (TDD).

Following this framework has an added benefit, since we force ourselves to put in concrete terms our data expectations, it makes easier to think how we want to get there.

Furthermore, tests also serve as documentation for us (and for others). By looking at our tests, anyone can see what our intent is. Then by looking at the code, it will be easier to spot mismatches between our intent and our implementation.

Pro tip: debugging and developing tests interactively

Even though tests are usually just a few short statements, writing them in an interactive way can help you quickly prototype your assertions. One simple trick you can use to do this is to start an interactive session and load the client and product variables.

Let’s imagine you want to write a test for a new SQL script (but the same applies for other types of scripts). You add a testing function, but it’s currently empty:

def my_sql_testing_function(client, product):
    pass

If you run this, Ploomber will still call your function, you can start an interactive session when this happens:

def my_sql_testing_function(client, product):
    from IPython import embed; embed()

Once you call ploomber build, wait for the Python prompt to show and verify you have the client and product variables:

>>> print(client)
>>> print(product)