Advanced: Debugging DAGs

Ploomber offers some debugging tools, but the exact approach depends on the type of task. This guides explains how to approach debugging depending on the task type.

[1]:
from pathlib import Path
import traceback

from IPython.display import HTML
import nbconvert
import nbformat

from ploomber import DAG
from ploomber.tasks import PythonCallable, NotebookRunner, SQLScript
from ploomber.products import File, SQLiteRelation
from ploomber.clients import SQLAlchemyClient
from ploomber.exceptions import DAGBuildError

Debugging PythonCallable tasks

Since PythonCallable, takes a function as source, we have integrated Ploomber with existing Python debugging tools. First, let’s take a look a the default behavior.

Don’t be intimidated by the long error message, it is designed to provide as much context as possible.

[2]:
dag = DAG()

def my_first_function(product):
    # task will fail here!
    1 / 0
    Path(str(product)).touch()

def my_second_function(product):
    # task will fail here
    raise ValueError
    Path(str(product)).touch()



t1 = PythonCallable(my_first_function, File('file1.txt'), dag, name='t1')
t2 = PythonCallable(my_second_function, File('file2.txt'), dag, name='t2')

try:
    dag.build()
except DAGBuildError as e:
    traceback.print_exc()


Traceback (most recent call last):
  File "/Users/Edu/dev/ploomber/src/ploomber/dag/DAG.py", line 384, in _build
    show_progress=show_progress)
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 127, in __call__
    .format(str(exceptions_all)))
ploomber.exceptions.DAGBuildError: DAG build failed, the following tasks crashed (corresponding downstream tasks aborted execution):
* PythonCallable: t2 -> File(file2.txt): multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 449, in _build
    res = self._run()
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 537, in _run
    self.run()
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/tasks.py", line 47, in run
    self.source.primitive(**self.params)
  File "<ipython-input-2-ae3fef1abe62>", line 10, in my_second_function
    raise ValueError
ValueError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Edu/miniconda3/envs/ploomber/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 461, in _build
    raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Error building task "t2"
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 177, in catch_exceptions
    fn()
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 154, in __call__
    return self.fn(**self.kwargs)
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 159, in catch_warnings
    result = fn()
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 154, in __call__
    return self.fn(**self.kwargs)
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 211, in build_in_subprocess
    report = res.get()
  File "/Users/Edu/miniconda3/envs/ploomber/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
ploomber.exceptions.TaskBuildError: Error building task "t2"


--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

* PythonCallable: t1 -> File(file1.txt): multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 449, in _build
    res = self._run()
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 537, in _run
    self.run()
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/tasks.py", line 47, in run
    self.source.primitive(**self.params)
  File "<ipython-input-2-ae3fef1abe62>", line 5, in my_first_function
    1 / 0
ZeroDivisionError: division by zero

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Edu/miniconda3/envs/ploomber/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/Users/Edu/dev/ploomber/src/ploomber/tasks/Task.py", line 461, in _build
    raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Error building task "t1"
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 177, in catch_exceptions
    fn()
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 154, in __call__
    return self.fn(**self.kwargs)
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 159, in catch_warnings
    result = fn()
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 154, in __call__
    return self.fn(**self.kwargs)
  File "/Users/Edu/dev/ploomber/src/ploomber/executors/Serial.py", line 211, in build_in_subprocess
    report = res.get()
  File "/Users/Edu/miniconda3/envs/ploomber/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
ploomber.exceptions.TaskBuildError: Error building task "t1"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<ipython-input-2-ae3fef1abe62>", line 19, in <module>
    dag.build()
  File "/Users/Edu/dev/ploomber/src/ploomber/dag/DAG.py", line 360, in build
    report = self._build(force, show_progress)
  File "/Users/Edu/dev/ploomber/src/ploomber/dag/DAG.py", line 465, in _build
    'Failed to build DAG {}'.format(self)) from build_exception
ploomber.exceptions.DAGBuildError: Failed to build DAG DAG("No name")

Starting a debugging session with PythonCallable.debug()

Take a close look at the traceback above, you’ll see that it contains information about the two failing tasks, this is because, by default, Ploomber captures errors in tasks and keeps executing them until no more tasks can run (when one task fails all their downstream dependencies are skipped). Furthermore, the traceback is organized in such a way that you can see the task name along with their original traceback.

Sometimes the traceback alone is enough to figure out how to fix the error, but sometimes it is not. In such cases, we can use PythonCallable.debug() to start a debugging session. Which will pass the exact same parameters to the function so we can interactively replicate the error (and hopefully, fix it).

# start a debugging session
dag['t1'].debug()

Click here to know more about the commands you can use in debug mode.

Using breakpoints

Another common technique to debug code is to use breakpoints, which halt execution at a given line. In Python, you can insert a breakpoint like this:

import pdb; pdb.set_trace()

This will not work by default, since Ploomber executes PythonCallables in a subprocess. To make breakpoints work, you have to turn this option off:

# turn the option off when creating the DAG
from ploomber.executors import Serial
dag = DAG(executor=Serial(build_in_subprocess=False))

or

# modify an existing DAG
dag.executor = Serial(build_in_subprocess=False)

Post-mortem debugging

Previously, we showed how we can start a line-by-line debugging session using PythonCallable.debug(). For long functions, we might just want to go straight to the failing line. This technique is called post-mortem debugging, since we let the program run and start a debugging session after the program fails.

Let’s see an example. Imagine you are in an interactive session in Jupyter or IPython and define the following function:

def some_function():
    x = 1
    y = 0
    x / y

Then run the following:

>>> some_function()

Calling some_function will raise a ZeroDivisionError. If immediately after calling some_function you run:

>>> import pdb; pdb.pm()

You’ll start a post-mortem session.

As we explained in the first section, Ploomber captures exceptions by default. This means that exceptions are raised at the end of the DAG execution, not in the original line. To make post-portem debugging work, you have to turn exception capturing off (and subprocess too):

# turn subprocess and exception capture off
from ploomber.executors import Serial
dag = DAG(executor=Serial(build_in_subprocess=False, catch_exceptions=False))

Debugging NotebookRunner tasks

Check partially executed notebook

Notebooks are executed cell by cell, this becomes useful when debugging, as we can analyze the partially executed notebook and look for errors (Note: a common error is not to include the “parameters” cell, which causes injected parameters to be added at the top of the notebook, always make sure that you include such cell).

Let’s see an example:

[3]:
dag = DAG()

root_notebook = """
# + tags=["parameters"]
product = None

# +
print(product)

# +
1 / 0
"""


t1 = NotebookRunner(root_notebook, File('root_notebook.ipynb'), dag, name='t1',
                    ext_in='py', papermill_params={'nest_asyncio': True})

try:
    dag.build()
except DAGBuildError:
    pass



Let’s take a look at the partially executed notebook:

[4]:
nb_str = Path('root_notebook.ipynb').read_text()
nb = nbformat.reads(nb_str, as_version=nbformat.NO_CONVERT)
HTML(data=nbconvert.export(nbconvert.HTMLExporter, nb)[0])
[4]:
Notebook

An Exception was encountered at 'In [4]'.

In [ ]:

In [1]:
product = None
In [2]:
# Parameters
product = "root_notebook.ipynb"
In [3]:
print(product)
root_notebook.ipynb

Execution using papermill encountered an exception here and stopped:

In [4]:
1 / 0
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-4-bc757c3fda29> in <module>
----> 1 1 / 0

ZeroDivisionError: division by zero

Starting a debugging session with NotebookRunner.debug()

Similarly to PythonCallable, NotebookRunner has a .debug() method that allows us to run the notebook line by line. This includes the injected parameters cell, which makes sure we are running the same code that produced the error.

Debugging SQL tasks

Ploomber supports parametrizing SQL scripts using jinja. When using a lot of placeholders or control structures in the templates, our code might fail to run. The first step is to check which error the database is returning. If we are getting a syntax error, we should first check exactly which code we are sending to the database.

[5]:
client = SQLAlchemyClient('sqlite:///my.db')
dag = DAG()
dag.clients[SQLScript] = client
dag.clients[SQLiteRelation] = client

t1 = SQLScript("""
CREATE TABLE {{product}} AS
SELECT * FROM my_raw_table
WHERE x > 0.5
""", SQLiteRelation(('my_first_table', 'table')), dag, name='t1')

dag.render()

[5]:
DAG("No name")
[6]:
# print rendered source code
print(dag['t1'].source)

CREATE TABLE my_first_table AS
SELECT * FROM my_raw_table
WHERE x > 0.5