Python API Quickstart

A quick example showcasing ploomber’s essential features.

ploomber expressive syntax makes pipeline declarations read like blueprints that provide a full picture: not only they include which tasks to perform and in which order, but where output will be stored and in which form. Furthermore, upstream products are available to downstream tasks, this way, each product is only declared once.

Expressive syntax

We start defining a few functions:

[3]:
from pathlib import Path
import tempfile

import pandas as pd
import numpy as np
from IPython.display import HTML

from ploomber import DAG
from ploomber.tasks import PythonCallable
from ploomber.products import File

# we declare two functions one to get data
# and another one to clean it


def get(product):
    """Get data"""
    df = pd.DataFrame({"column": np.random.rand(100)})
    df.to_csv(str(product))


def clean(upstream, product):
    """Clean data"""
    data = pd.read_csv(str(upstream["get"]))
    clean = data[data.column >= 0.5]
    clean.to_csv(str(product))

Create a DAG which will hold all our tasks and instantiate Task objects using our functions:

[4]:
# tmp directory to save our data
tmp_dir = Path(tempfile.mkdtemp())

# create a DAG object to organize all the tasks
dag = DAG()

# create tasks from our functions
task_get = PythonCallable(
    get,
    # where to save output
    product=File(tmp_dir / "data.csv"),
    dag=dag,
)

task_clean = PythonCallable(clean, product=File(tmp_dir / "clean.csv"), dag=dag)

Declare how our tasks relate to each other:

[5]:
task_get >> task_clean
[5]:
PythonCallable: clean -> File(/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/clean.csv)

Standalone execution

ploomber’s pipelines are ready to run right after being created, no need to setup a separate system. Since all products are part of the declaration, one can switch them entirely to isolate executions depending on the environment (e.g. save all output in /data/{{user}} where user is the current logged-in user).

Before building the pipeline, let’s get a summary:

[6]:
dag.status()

[6]:
name Last updated Outdated dependencies Outdated code Product Doc (short) Location
get Has not been runFalse True /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/data.csv Get data :15
clean Has not been runTrue True /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/clean.csvClean data :21
[7]:
dag.build()


[7]:
name Ran? Elapsed (s) Percentage
get True 0.124395 51.3855
clean True 0.117687 48.6145

Incremental

Pipelines usually take hours or even days to run, during the development phase, it is wasteful to re-execute the pipeline on each change. ploomber keeps track of code changes and only executes a task if the source code has changed since its last execution.

[8]:
dag.build()


[8]:
name Ran? Elapsed (s) Percentage
get False 0 0
clean False 0 0

Testable

ploomber also supports a hook to execute code upon task execution. This allows to write acceptance tests that explicitely state input assumptions (e.g. check a data frame’s input schema).

[9]:
def test_no_nas(task):
    print("Testing there are no NAs...")
    path = str(task.product)
    df = pd.read_csv(path)
    assert not df.column.isna().sum()


task_get.on_finish = test_no_nas
task_clean.on_finish = test_no_nas
[10]:
# Ignore status and force execution on all tasks
# so we also run on_finish
dag.build(force=True)

Testing there are no NAs...
Testing there are no NAs...

[10]:
name Ran? Elapsed (s) Percentage
get True 0.123876 50.3389
clean True 0.122208 49.6611

Interactive

[11]:
# get task named "get"
task = dag["clean"]

# which are their upstream dependencies?
print(task.upstream)
{'get': PythonCallable: get -> File(/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/data.csv)}
[12]:
# only execute this task instead of the entire dag
task.build(force=True)
Testing there are no NAs...
[12]:
<TaskStatus.Executed: 'executed'>

Sstart a debugging session using pdb (only works if task is a PythonCallable)

task.debug()

Communicable

[14]:
html = dag.to_markup()
HTML(html)


[14]:

DAG report

Plot

Status

name Last updated Outdated dependencies Outdated code Product Doc (short) Location
get 3 seconds ago (Mar 16, 20 at 17:45)False False /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/data.csv Get data :15
clean 3 seconds ago (Mar 16, 20 at 17:45)False False /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf0s8ypu8/clean.csvClean data :21

Source code

get

def get(product):
    """Get data
    """
    df = pd.DataFrame({'column': np.random.rand(100)})
    df.to_csv(str(product))

clean

def clean(upstream, product):
    """Clean data
    """
    data = pd.read_csv(str(upstream['get']))
    clean = data[data.column >= 0.5]
    clean.to_csv(str(product))