Tasks

class ploomber.tasks.DownloadFromURL(source, product, dag, name=None, params=None)

Download a file from a URL (uses urllib.request.urlretrieve)

Parameters
  • source (str) – URL to download the file from

  • product (ploomber.products.File) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.Input(product, dag, name)

A dummy task used to represent input provided by the user, it is always considered outdated.

When making new predictions, the user must submit some input data to build features and then feed the model, this task can be used to point to such input. It does not perform any processing (read-only data) but it is always considered outdated, which means it will always trigger execution.

Parameters
  • product (ploomber.products.Product) – Product to to serve as input to the dag

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

run()

This is the only required method Task subclasses must implement

A dummy Task used to “plug” an external Product to a pipeline, this task is always considered up-to-date

The purpose of this Task is to link a pipeline to an external read-only file, this task does not do anything on the dataset and the product is always considered up-to-date. There are two primary use cases: when the raw data is automatically uploaded to a file (or table) and the pipeline does not have control over data updates, this task can be used to link the pipeline to that file, without having to copy it, downstream tasks will see this dataset as just another Product. The second use case is when developing a prediction pipeline. When making predictions on new data, the pipeline might rely on existing data to generate features, this task can be used to point to such file it can also be used to point to a serialized model, this last scenario is only recommended for prediction pipeline that do not have strict performance requirements, unserializing models is an expensive operation, for real-time predictions, the model should be kept in memory

Parameters
  • product (ploomber.products.Product) – Product to link to the dag

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.NotebookRunner(source, product, dag, name=None, params=None, papermill_params=None, kernelspec_name=None, nbconvert_exporter_name=None, ext_in=None, nb_product_key='nb', static_analysis=False, nbconvert_export_kwargs=None, local_execution=False)

Run a Jupyter notebook using papermill. Support several input formats via jupytext and several output formats via nbconvert

Parameters
  • source (str or pathlib.Path) – Notebook source, if str, the content is interpreted as the actual notebook, if pathlib.Path, the content of the file is loaded. When loading from a str, ext_in must be passed

  • product (ploomber.File) – The output file

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str, optional) – A str to indentify this task. Should not already exist in the dag

  • params (dict, optional) – Notebook parameters. This are passed as the “parameters” argument to the papermill.execute_notebook function, by default, “product” and “upstream” are included

  • papermill_params (dict, optional) – Other parameters passed to papermill.execute_notebook, defaults to None

  • kernelspec_name (str, optional) – Kernelspec name to use, if the file extension provides with enough information to choose a kernel or the notebook already includes kernelspec data (in metadata.kernelspec), this is ignored, otherwise, the kernel is looked up using jupyter_client.kernelspec.get_kernel_spec

  • nbconvert_exporter_name (str, optional) – Once the notebook is run, this parameter controls whether to export the notebook to a different parameter using the nbconvert package, it is not needed unless the extension cannot be used to infer the final output format, in which case the nbconvert.get_exporter is used

  • ext_in (str, optional) – Source extension. Required if loading from a str. If source is a pathlib.Path, the extension from the file is used.

  • nb_product_key (str, optional) – If the notebook is expected to generate other products, pass the key to identify the output notebook (i.e. if product is a list with 3 ploomber.File, pass the index pointing to the notebook path). If the only output is the notebook itself, this parameter is not needed

  • static_analysis (bool) – Run static analysis after rendering. This requires a cell with the tag ‘parameters’ to exist in the notebook, such cell should have at least a “product = None” variable declared. Passed and declared parameters are compared (they make notebooks behave more like “functions”), pyflakes is also run to detect errors before executing the notebook. If the task has upstream dependencies an upstream parameter should also be declared “upstream = None”

  • nbconvert_export_kwargs (dict) – Keyword arguments to pass to the nbconvert.export function (this is only used if exporting the output ipynb notebook to another format). You can use this, for example, to hide code cells using the exclude_input parameter. See nbconvert documentation for details.

  • local_execution (bool, optional) – Change working directory to be the parent of the notebook’s source. Defaults to False. This resembles the default behavior when running notebooks interactively via jupyter notebook

Examples

>>> from pathlib import Path
>>> from ploomber import DAG
>>> from ploomber.tasks import NotebookRunner
>>> from ploomber.products import File
>>> dag = DAG()
>>> # do not include input code (only cell's output)
>>> NotebookRunner(Path('nb.ipynb'), File('out-1.html'), dag=dag,
...                nbconvert_export_kwargs={'exclude_input': True},
...                name=1)
>>> # Selectively remove cells with the tag "remove"
>>> config = {'TagRemovePreprocessor': {'remove_cell_tags': ('remove',)},
...           'HTMLExporter':
...             {'preprocessors':
...             ['nbconvert.preprocessors.TagRemovePreprocessor']}}
>>> NotebookRunner(Path('nb.ipynb'), File('out-2.html'), dag=dag,
...                nbconvert_export_kwargs={'config': config},
...                name=2)
>>> dag.build()

Notes

nbconvert’s documentation: https://nbconvert.readthedocs.io/en/latest/config_options.html#preprocessor-options

debug(kind='ipdb')

Opens the notebook (with injected parameters) in debug mode in a temporary location

Parameters

kind (str, default='ipdb') – Debugger to use, ‘ipdb’ to use line-by-line IPython debugger, ‘pdb’ to use line-by-line Python debugger or ‘pm’ to to post-portem debugging using IPython

Notes

Be careful when debugging tasks. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.

develop(app='notebook', args=None)

Opens the rendered notebook (with injected parameters) and adds a “debugging-settings” cell to the that changes directory to the current active directory. This will reflect conditions when callign DAG.build(). This modified notebook is saved in the same location as the source with a “-tmp” added to the filename. Changes to this notebook can be exported to the original notebook after the notebook process is shut down. The “injected-parameters” and “debugging-settings” cells are deleted before saving.

Parameters
  • app ({'notebook', 'lab'}, default: 'notebook') – Which Jupyter application to use

  • args (str) – Extra parameters passed to the jupyter application

Notes

Be careful when developing tasks interacively. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.

If you modify the source code and call develop again, the source code will be updated only if the hot_reload option is turned on. See ploomber.DAGConfigurator for details.

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.PostgresCopyFrom(source, product, dag, name=None, client=None, params=None, sep='\t', null='\\N', columns=None)

Efficiently copy data to a postgres database using COPY FROM (faster alternative to SQLUpload for postgres). If using SQLAlchemy client for postgres is psycopg2. Replaces the table if exists.

Parameters
  • source (str or pathlib.Path) – Path to parquet file to upload

  • client (ploomber.clients.SQLAlchemyClient, optional) – The client used to connect to the database and where the data will be uploaded. Only required if no dag-level client has been declared using dag.clients[class]

Notes

Although this task does not depend on pandas for data i/o, it still needs it to dynamically create the table, after the table is created the COPY statement is used to upload the data

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.PythonCallable(source, product, dag, name=None, params=None, unserializer=None, serializer=None)

Run a Python callable (e.g. a function)

Parameters
  • source (callable) – The callable to execute

  • product (ploomber.products.Product) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • params (dict) – Parameters to pass to the callable, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here

  • unserializer (callable, optional) – A callable to unserialize upstream products, the product object is passed as unique argument. If None, the source function receives the product object directly. If the task has no upstream dependencies, this argument has no effect

  • serializer (callable, optional) – A callable to serialize this task’s product, must take two arguments, the first argument passed is the value returned by the task’s source, the second argument is the product oject. If None, the task’s source is responsible for serializing its own product

debug(kind='ipdb')

Run callable in debug mode.

Parameters

kind (str ('ipdb' or 'pdb')) – Which debugger to use ‘ipdb’ for IPython debugger or ‘pdb’ for debugger from the standard library

Notes

Be careful when debugging tasks. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.

develop(app='notebook', args=None)

Edit function interactively using Jupyter

Parameters
  • app (str, {'notebook', 'lab'}, default='notebook') – Which jupyter application to use

  • args (str) – Extra args passed to the selected jupyter application

Notes

Cells whose first line is an empty commenf (“#”), will be removed when exporting back to the function, you can use this for temporary, exploratory work

Be careful when developing tasks interacively. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.

load()

Loads the product, only works if the task is initialized with an unzerializer

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.SQLDump(source, product, dag, name=None, client=None, params=None, chunksize=10000, io_handler=None)

Dumps data from a SQL SELECT statement to a file(s)

Parameters
  • source (str or pathlib.Path) – SQL script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded

  • product (ploomber.products.Product) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.DBAPIClient or SQLAlchemyClient, optional) – The client used to connect to the database. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

  • chunksize (int, optional) – Number of rows per file, otherwise download the entire dataset in a single one. If set, the product will be a folder

  • io_handler (ploomber.io.CSVIO or ploomber.io.ParquetIO, optional) – io handler to use (which controls the output format), currently only csv and parquet are supported. If None, it tries to infer the handler from the product’s extension if that doesn’t work, if uses io.CSVIO

Notes

The chunksize parameter is also set in cursor.arraysize object, this parameter can greatly speed up the dump for some databases when the driver uses cursors.arraysize as the number of rows to fetch on a single network trip, but this is driver-dependent, not all drivers implement this (cx_Oracle does it)

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.SQLScript(source, product, dag, name=None, client=None, params=None)

Execute a script in a SQL database to create a relation or view

Parameters
  • source (str or pathlib.Path) – SQL script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded

  • product (ploomber.products.Product) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.DBAPIClient or SQLAlchemyClient, optional) – The client used to connect to the database. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.SQLTransfer(source, product, dag, name=None, client=None, params=None, chunksize=10000)

Transfers data from a SQL database to another (Note: this relies on pandas, only use it for small to medium size datasets)

Parameters
  • source (str or pathlib.Path) – SQL script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded

  • product (ploomber.products.Product) – Product generated upon successful execution. For SQLTransfer, usually product.client != task.client. task.client represents the data source while product.client represents the data destination

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.SQLAlchemyClient, optional) – The client used to connect to the database. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

  • chunksize (int, optional) – Number of rows to transfer on every chunk

Notes

This task is not intended to move large datasets, but a convenience way of transfering small to medium size datasets. It relies on pandas to read and write, which introduces a considerable overhead.

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.SQLUpload(source, product, dag, name=None, client=None, params=None, chunksize=None, io_handler=None, to_sql_kwargs=None)

Upload data to a SQL database from a parquet or a csv file. Note: this task relies uses pandas.to_sql which introduces some overhead. Only use it for small to medium size datasets. Each database usually come with a tool to upload data efficiently. If you are using PostgreSQL, check out the PostgresCopyFrom task.

Parameters
  • source (str or pathlib.Path) – Path to parquet or a csv file to upload

  • product (ploomber.products.Product) – Product generated upon successful execution. The client for the product must be in the target database, where as task.client should be a client in the source database.

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.SQLAlchemyClient, optional) – The client used to connect to the database and where the data will be uploaded. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

  • chunksize (int, optional) – Number of rows to transfer on every chunk

  • io_handler (callable, optional) – A Python callable to read the source file, if None, it will tried to be inferred from the source file extension

  • to_sql_kwargs (dict, optional) – Keyword arguments passed to the pandas.DataFrame.to_sql function, one useful parameter is “if_exists”, which determines if the inserted rows should replace the table or just be appended

Notes

This task is not intended to move large datasets, but a convenience way of transfering small to medium size datasets. It relies on pandas to read and write, which introduces a considerable overhead.

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.ShellScript(source, product, dag, name=None, client=None, params=None)

Execute a shell script in a shell

Parameters
  • source (str or pathlib.Path) – Script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded. The souce code must have the {{product}} tag

  • product (ploomber.products.Product) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.ShellClient or RemoteShellClient, optional) – The client used to connect to the database. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

run()

This is the only required method Task subclasses must implement

class ploomber.tasks.Task(product, dag, name=None, params=None)

Abstract class for all Tasks

Parameters
  • source (str or pathlib.Path) – Source code for the task, for tasks that do not take source code as input (such as PostgresCopyFrom), this can be another thing. The source can be a template and can make references to any parameter in “params”, “upstream” parameters or its own “product”, not all Tasks have templated source (templating code is mostly used by Tasks that take SQL source code as input)

  • product (Product) – The product that this task will create upon completion

  • dag (DAG) – The DAG holding this task

  • name (str) – A name for this task, if None a default will be assigned

  • params (dict) – Extra parameters passed to the task on rendering (if templated source) or during execution (if not templated source)

params

A read-only dictionary-like object with params passed, after running ‘product’ and ‘upstream’ are added, if any

Type

Params

on_render

Function to execute after rendering. The function can request any of the following parmeters: task, client and/or product.

Type

callable

on_finish

Function to execute upon execution. Can request the same params as the on_render hook.

Type

callable

on_failure

Function to execute upon failure. Can request the same params as the on_render hook.

Type

callable

Notes

All subclasses must implement the same constuctor to keep the API consistent, optional parameters after “params” are ok

build(force=False, catch_exceptions=True)

Build a single task

Although Tasks are primarily designed to execute via DAG.build(), it is possible to do so in isolation. However, this only works if the task does not have any unrendered upstream dependencies, if that’s the case, you should call DAG.render() before calling Task.build()

Examples

>>> from pathlib import Path
>>> from ploomber import DAG
>>> from ploomber.tasks import PythonCallable
>>> from ploomber.products import File
>>> def fn(product):
...     Path(str(product)).touch()
>>> PythonCallable(fn, File('file.txt'), dag=DAG()).build()
Returns

A dictionary with keys ‘run’ and ‘elapsed’

Return type

dict

Raises
  • TaskBuildError – If the error failed to build because it has upstream dependencies, the build itself failed or build succeded but on_finish hook failed

  • DAGBuildEarlyStop – If any task or on_finish hook raises a DAGBuildEarlyStop error

debug()

Debug task, only implemented in certain tasks

develop()

Develop task, only implemented in certain tasks

property name

A str that represents the name of the task, you can access tasks in a dag using dag[‘some_name’]

property on_failure

Callable to be executed if task fails (passes Task as first parameter and the exception as second parameter)

property on_finish

Callable to be executed after this task is built successfully (passes Task as first parameter)

property params

dict that holds the parameter that will be passed to the task upon execution. Before rendering, this will only hold parameters passed in the Task constructor. After rendering, this will hold new keys: “product” contained the rendered product and “upstream” holding upstream parameters if there is any

property product

The product this task will create upon execution

render(force=False, outdated_by_code=True)

Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.

Render fully determines whether a task should run or not.

Parameters
  • force (bool, default=False) – If True, mark status as WaitingExecution/WaitingUpstream even if the task is up-to-date, otherwise, the normal process follows and only up-to-date tasks are marked as Skipped.

  • outdated_by_code (bool, default=True) – Factors to determine if Task.product is marked outdated when source code changes. Otherwise just the upstream timestamps are used.

Notes

This method tries to avoid calls to check for product status whenever possible, since checking product’s metadata can be a slow operation (e.g. if metadata is stored in a remote database)

When passing force=True, product’s status checking is skipped altogether, this can be useful when we only want to quickly get a rendered DAG object to interact with it

abstract run()

This is the only required method Task subclasses must implement

property source

Source is used by the task to compute its output, for most cases this is source code, for example PythonCallable takes a function as source and SQLScript takes a string with SQL code as source. But other tasks might take non-code objects as source, for example, PostgresCopyFrom takes a path to a file. If source represents code doing str(task.source) will return the string representation

status(return_code_diff=False, sections=None)

Prints the current task status

Parameters

sections (list, optional) – Sections to include. Defaults to “name”, “last_run”, “oudated”, “product”, “doc”, “location”

to_dict()

Returns a dict representation of the Task, only includes a few attributes

property upstream

A mapping for upstream dependencies {task name} -> [task object]

class ploomber.tasks.TaskFactory(task_class, product_class, dag)

Utility class for reducing boilerplate code

class ploomber.tasks.UploadToS3(source, product, dag, bucket, name=None, params=None, client_kwargs=None, upload_file_kwargs=None)

Upload a file to S3

Parameters
  • source (str) – Path to file to upload

  • product (ploomber.products.GenericProduct) – Product must be initialized with the desired Key

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • bucket (str) – Bucked to upload

run()

This is the only required method Task subclasses must implement

ploomber.tasks.in_memory_callable(callable_, dag, name, params)

Returns a special in-memory task that runs a callable with the given params. When calling InMemoryDAG(dag).build(), this task should not appear in the input_data dictionary

Parameters
  • callable (callable) – An arbitrary callable to execute

  • dag (ploomber.DAG) – DAG where the task should be added

  • name (str) – Task name

  • params (dict) – Parameters to pass when calling callable_. e.g passing params=dict(a=1), is equivalent to calling callable_ with a=1.

Returns

A PythonCallable task with special characteristics. It cannot be invoked directly, but through InMemoryDAG(dag).build()

Return type

PythonCallable

ploomber.tasks.input_data_passer(dag, name, preprocessor=None)

Returns a special in-memory task that forwards input data as product to downstream tasks.

Parameters
  • dag (ploomber.DAG) – DAG where the task should be added

  • name (str) – Task name

  • preprocessor (callable, default=None) – An arbitrary callable that can be used to add custom logic to the input data before passing it to downstream tasks

Returns

A PythonCallable task with special characteristics. It cannot be invoked directly, but through InMemoryDAG(dag).build()

Return type

PythonCallable