ploomber.tasks.SQLDump

class ploomber.tasks.SQLDump(source, product, dag, name=None, client=None, params=None, chunksize=10000, io_handler=None)

Dumps data from a SQL SELECT statement to a file(s)

Parameters:
  • source (str or pathlib.Path) – SQL script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded

  • product (ploomber.products.product) – Product generated upon successful execution

  • dag (ploomber.DAG) – A DAG to add this task to

  • name (str) – A str to indentify this task. Should not already exist in the dag

  • client (ploomber.clients.{SQLAlchemyClient, DBAPIClient}, optional) – The client used to connect to the database. Only required if no dag-level client has been declared using dag.clients[class]

  • params (dict, optional) – Parameters to pass to the script, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here. The source code is converted to a jinja2.Template for passing parameters, refer to jinja2 documentation for details

  • chunksize (int, optional) – Number of rows per file, otherwise download the entire dataset in a single one. If not None, the product becomes a directory

  • io_handler (ploomber.io.CSVIO or ploomber.io.ParquetIO, optional) – io handler to use (which controls the output format), currently only csv and parquet are supported. If None, it tries to infer the handler from the product’s extension if that doesn’t work, it uses io.CSVIO

Examples

Spec API:

clients:
  # define a get function in clients.py that returns the client
  SQLDump: clients.get

tasks:
  # script with a SELECT statement
  - source: script.sql
    product: data.parquet

Full spec API example.

Python API:

>>> import sqlite3
>>> import pandas as pd
>>> from ploomber import DAG
>>> from ploomber.products import File
>>> from ploomber.tasks import SQLDump
>>> from ploomber.clients import DBAPIClient
>>> con_raw = sqlite3.connect(database='my_db.db')
>>> df = pd.DataFrame({'a': range(100), 'b': range(100)})
>>> _ = df.to_sql('numbers', con_raw, index=False)
>>> con_raw.close()
>>> dag = DAG()
>>> client = DBAPIClient(sqlite3.connect, dict(database='my_db.db'))
>>> _ = SQLDump('SELECT * FROM numbers', File('data.parquet'),
...             dag=dag, name='dump', client=client, chunksize=None)
>>> _ = dag.build()
>>> df = pd.read_parquet('data.parquet')
>>> df.head(3)
   a  b
0  0  0
1  1  1
2  2  2

Notes

The chunksize parameter is also set in cursor.arraysize object, this parameter can greatly speed up the dump for some databases when the driver uses cursors.arraysize as the number of rows to fetch on a single network trip, but this is driver-dependent, not all drivers implement this (cx_Oracle does it)

See also

ploomber.clients.SQLScript

A task to execute a SQL script and create a table/view as product

Methods

build([force, catch_exceptions])

Build a single task

debug()

Debug task, only implemented in certain tasks

load([key])

Load task as pandas.DataFrame.

render([force, outdated_by_code, remote])

Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.

run()

This is the only required method Task subclasses must implement

set_upstream(other[, group_name])

status([return_code_diff, sections])

Prints the current task status

build(force=False, catch_exceptions=True)

Build a single task

Although Tasks are primarily designed to execute via DAG.build(), it is possible to do so in isolation. However, this only works if the task does not have any unrendered upstream dependencies, if that’s the case, you should call DAG.render() before calling Task.build()

Returns:

A dictionary with keys ‘run’ and ‘elapsed’

Return type:

dict

Raises:
  • TaskBuildError – If the error failed to build because it has upstream dependencies, the build itself failed or build succeded but on_finish hook failed

  • DAGBuildEarlyStop – If any task or on_finish hook raises a DAGBuildEarlyStop error

debug()

Debug task, only implemented in certain tasks

load(key=None, **kwargs)

Load task as pandas.DataFrame. Only implemented in certain tasks

render(force=False, outdated_by_code=True, remote=False)

Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.

Render fully determines whether a task should run or not.

Parameters:
  • force (bool, default=False) – If True, mark status as WaitingExecution/WaitingUpstream even if the task is up-to-date (if there are any File(s) with clients, this also ignores the status of the remote copy), otherwise, the normal process follows and only up-to-date tasks are marked as Skipped.

  • outdated_by_code (bool, default=True) – Factors to determine if Task.product is marked outdated when source code changes. Otherwise just the upstream timestamps are used.

  • remote (bool, default=False) – Use remote metadata to determine status

Notes

This method tries to avoid calls to check for product status whenever possible, since checking product’s metadata can be a slow operation (e.g. if metadata is stored in a remote database)

When passing force=True, product’s status checking is skipped altogether, this can be useful when we only want to quickly get a rendered DAG object to interact with it

run()

This is the only required method Task subclasses must implement

set_upstream(other, group_name=None)
status(return_code_diff=False, sections=None)

Prints the current task status

Parameters:

sections (list, optional) – Sections to include. Defaults to “name”, “last_run”, “oudated”, “product”, “doc”, “location”

Attributes

PRODUCT_CLASSES_ALLOWED

client

exec_status

name

A str that represents the name of the task, you can access tasks in a dag using dag['some_name']

on_failure

Callable to be executed if task fails (passes Task as first parameter and the exception as second parameter)

on_finish

Callable to be executed after this task is built successfully (passes Task as first parameter)

on_render

params

dict that holds the parameter that will be passed to the task upon execution.

product

The product this task will create upon execution

source

Source is used by the task to compute its output, for most cases this is source code, for example PythonCallable takes a function as source and SQLScript takes a string with SQL code as source.

upstream

A mapping for upstream dependencies {task name} -> [task object]