ploomber.DAG

class ploomber.DAG(name=None, clients=None, executor='serial')

A collection of tasks with dependencies

Parameters:
  • name (str, optional) – A name to identify this DAG

  • clients (dict, optional) – A dictionary with classes as keys and clients as values, can be later modified using dag.clients[dag] = client

  • differ (CodeDiffer) – An object to determine whether two pieces of code are the same and to output a diff, defaults to CodeDiffer() (default parameters)

  • executor (str or ploomber.executors instance, optional) – The executor to use (ploomber.executors.Serial and ploomber.executors.Parallel), is a string is passed (‘serial’ or ‘parallel’) the corresponding executor is initialized with default parameters

name

A name to identify the DAG

Type:

str

clients

A class to client mapping

Type:

dict

executor

Executor object to run tasks

Type:

ploomber.Executor

on_render

Function to execute upon rendering. Can request a “dag” parameter.

Type:

callable

on_finish

Function to execute upon execution. Can request a “dag” parameter and/or “report”, which contains the report object returned by the build function.

Type:

callable

on_failure

Function to execute upon failure. Can request a “dag” parameter and/or “traceback” which will contain a dictionary, possible keys are “build” which contains the build error traceback and “on_finish” which contains the on_finish hook traceback, if any.

Type:

callable

serializer

Function to serialize products from PythonCallable tasks. Used if the task has no serializer. See ploombe.tasks.PythonCallable documentation for details.

Type:

callable

unserializer

Function to unserialize products from PythonCallable tasks. Used if the task has no serializer. See ploombe.tasks.PythonCallable documentation for details.

Type:

callable

Examples

Spec API:

pip install ploomber
ploomber examples -n guides/first-pipeline -o example
cd example
pip install -r requirements.txt
ploomber build

Python API:

>>> from pathlib import Path
>>> from ploomber import DAG
>>> from ploomber.tasks import ShellScript, PythonCallable
>>> from ploomber.products import File
>>> from ploomber.executors import Serial
>>> code = ("echo hi > {{product['first']}}; "
...         "echo bye > {{product['second']}}")
>>> _ = Path('script.sh').write_text(code)
>>> dag = DAG(executor=Serial(build_in_subprocess=False))
>>> product = {'first': File('first.txt'), 'second': File('second.txt')}
>>> shell = ShellScript(Path('script.sh'), product, dag=dag, name='script')
>>> def my_task(upstream, product):
...     first = Path(upstream['script']['first']).read_text()
...     second = Path(upstream['script']['second']).read_text()
...     Path(product).write_text(first + ' ' + second)
>>> callable = PythonCallable(my_task, File('final.txt'), dag=dag)
>>> shell >> callable
PythonCallable: my_task -> File('final.txt')
>>> _ = dag.build()

Methods

build([force, show_progress, debug, ...])

Runs the DAG in order so that all upstream dependencies are run for every task

build_partially(target[, force, ...])

Partially build a dag until certain task

check_tasks_have_allowed_status(allowed, ...)

close_clients()

Close all clients (dag-level, task-level and product-level)

get(k[,d])

get_downstream(task_name)

Get downstream tasks for a given task name

items()

keys()

plot([output, include_products, backend, ...])

Plot the DAG

pop(name)

Remove a task from the dag

render([force, show_progress, remote])

Render resolves all placeholders in tasks and determines whether a task should run or not based on the task.product metadata, this allows up-to-date tasks to be skipped

status(**kwargs)

Returns a table with tasks status

to_markup([path, fmt, sections, backend])

Returns a str (md or html) with the pipeline's description

values()

build(force=False, show_progress=True, debug=None, close_clients=True)

Runs the DAG in order so that all upstream dependencies are run for every task

Parameters:
  • force (bool, default=False) – If True, it will run all tasks regardless of status, defaults to False

  • show_progress (bool, default=True) – Show progress bar

  • debug ('now' or 'later', default=None) – If ‘now’, Drop a debugging session if building raises an exception. Note that this modifies the executor and temporarily sets it to Serial with subprocess off and catching exceptions/warnings off. Restores the original executor at the end. If ‘later’ it keeps the executor the same and serializes the traceback errors for later debugging

close_clientsbool, default=True

Close all clients (dag-level, task-level and product-level) upon successful build

Notes

All dag-level clients are closed after calling this function

changelog

Changed in version 0.20: debug changed from True/False to ‘now’/’later’/None

New in version 0.20: debug now supports debugging NotebookRunner tasks

Returns:

A dict-like object with tasks as keys and dicts with task status as values

Return type:

BuildReport

build_partially(target, force=False, show_progress=True, debug=None, skip_upstream=False)

Partially build a dag until certain task

Parameters:
  • target (str) – Name of the target task (last one to build). Can pass a wildcard such as ‘tasks-*

  • force (bool, default=False) – If True, it will run all tasks regardless of status, defaults to False

  • show_progress (bool, default=True) – Show progress bar

  • debug ('now' or 'later', default=None) – If ‘now’, Drop a debugging session if building raises an exception. Note that this modifies the executor and temporarily sets it to Serial with subprocess off and catching exceptions/warnings off. Restores the original executor at the end. If ‘later’ it keeps the executor the same and serializes the traceback errors for later debugging

  • skip_upstream (bool, default=False) – If False, includes all upstream dependencies required to build target, otherwise it skips them. Note that if this is True and it’s not possible to build a given task (e.g., missing upstream products), this will fail

Notes

changelog

Changed in version 0.20: debug changed from True/False to ‘now’/’later’/None

New in version 0.20: debug now supports debugging NotebookRunner tasks

check_tasks_have_allowed_status(allowed, new_status)
close_clients()

Close all clients (dag-level, task-level and product-level)

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_downstream(task_name)

Get downstream tasks for a given task name

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
plot(output='embed', include_products=False, backend=None, image_only=False)

Plot the DAG

Parameters:
  • output (str, default='embed') – Where to save the output (e.g., pipeline.png). If ‘embed’, it returns an IPython image instead.

  • include_products (bool, default=False) – If False, each node only contains the task name, if True if contains the task name and products. Only available when using the pygraphviz backend

  • backend (str, default=None) – How to generate the plot, if None it uses pygraphviz if installed, otherwise it uses D3 (which doesn’t require extra dependencies), you can force to use a backend by passing ‘pygraphviz’, ‘d3’, or ‘mermaid’.

pop(name)

Remove a task from the dag

render(force=False, show_progress=True, remote=False)

Render resolves all placeholders in tasks and determines whether a task should run or not based on the task.product metadata, this allows up-to-date tasks to be skipped

Parameters:
  • force (bool, default=False) – Ignore product metadata status and prepare all tasks to be executed. This option renders much faster in DAGs with products whose metadata is stored in remote systems, because there is no need to fetch metadata over the network. If the DAG won’t be built, this option is recommended.

  • show_progress (bool, default=True) – Show progress bar

  • remote (bool, default=False) – Use remote metadata for determining task status. In most scenarios, you want this to be False, Ploomber uses this internally when exporting pipelines to other platforms (via Soopervisor).

status(**kwargs)

Returns a table with tasks status

to_markup(path=None, fmt='html', sections=None, backend=None)

Returns a str (md or html) with the pipeline’s description

Parameters:

sections (list) – Which sections to include, possible values are “plot”, “status” and “source”. Defaults to [“plot”, “status”]

values() an object providing a view on D's values

Attributes

clients

executor

product