The two most common ways to deploy data pipelines are batch and online. Ploomber supports both deployment options.

In batch, you obtain new data to make predictions and store them for later use. This process usually happens on a schedule. For example, you may develop a Machine Learning pipeline that runs every morning, predicts the probability of user churn, and stores such probabilities in a database table.

Alternatively, you may deploy a pipeline as an online service and expose your model as a REST API; users request predictions at any time by sending input data.

Pipeline composition

Before diving into deployment details, let’s introduce the concept of pipeline composition.

The only difference between a Machine Learning training pipeline and its serving counterpart is what happens at the beginning and the end.

At training time, we obtain historical data, generate features, and train a model:

graph LR A[Get historical data] --> B1[A feature] --> C[Train model] A --> B2[Another feature] --> C subgraph Feature engineering B1 B2 end

At serving time, we obtain new data, generate features and make predictions using a trained model:

graph LR A[Get new data] --> B1[A feature] --> C[Predict] A --> B2[Another feature] --> C subgraph Feature engineering B1 B2 end

When the feature engineering process does not match, training-serving skew arises. Training-serving skew is one of the most common problems when deploying ML models. To fix it, Ploomber allows you to compose pipelines: write your feature generation once and re-use it to organize your training and serving pipelines; this ensures that the feature engineering code matches precisely.

Batch processing

Ploomber pipelines can export to production-grade schedulers for batch processing. Check out our package Soopervisor, which allows you to export to Kubernetes (via Argo workflows), AWS Batch, and Airflow.

Composing batch pipelines

To compose a batch pipeline, use the import_tasks_from directive in your pipeline.yaml file.

For example, define your feature generation tasks in a features.yaml file:

# generate one feature...
- source: features.a_feature
  product: features/a_feature.csv

# another feature...
- source: features.anoter_feature
  product: features/another_feature.csv

# join the two previous features...
- source: features.join
  product: features/all.csv

Then import those tasks in your training pipeline, pipeline.yaml:

    # import feature generation tasks
    import_tasks_from: features.yaml

    # Get raw data for training
    - source: train.get_historical_data
      product: raw/get.csv

    # The import_tasks_from injects your features generation tasks here

    # Train a model
    - source: train.train_model
      product: model/model.pickle

Your serving pipeline pipepline-serve.yaml would look like this:

    # import feature generation tasks
    import_tasks_from: features.yaml

    # Get new data for predictions
    - source: serve.get_new_data
      product: serve/get.parquet

    # The import_tasks_from injects your features generation tasks here

    # Make predictions using a trained model
    - source: serve.predict
      product: serve/predictions.csv
        path_to_model: model.pickle


Here’s an example project showing how to use import_tasks_from to create a training (pipeline.yaml) and serving (pipeline-serve.yaml) pipeline.

Online service (API)

To encapsulate all your pipeline’s logic for online predictions, use ploomber.OnlineDAG. Once implemented, you can generate predictions like this:

from my_project import MyOnlineDAG

# MyOnlineDAG is a subclass of OnlineDAG
dag = MyOnlineDAG()

You can easily integrate an online DAG with any library such as Flask or gRPC.

The only requisite is that your feature generation code should be entirely made of Python functions (i.e., ploomber.tasks.PythonCallable) tasks with configured serializer and unserializer.

Composing online pipelines

To create an online DAG, list your feature tasks in a features.yaml and use import_tasks_from in your training pipeline (pipeline.yaml). Subclass ploomber.OnlineDAG to create a serving pipeline.

OnlineDAG will take your tasks from features.yaml and create new “input tasks” based on upstream references in yout feature tasks.

For example, if features.yaml has tasks a_feature and another_feature (see the diagram in the first section), and both obtain their inputs from a task named get; the source code may look like this:

def a_feature(upstream):
    raw_data = upstream['get']
    # process raw_data to generate features...
    # return a_feature
    return df_a_feature

def another_feature(upstream):
    raw_data = upstream['get']
    # process raw_data to generate features...
    # return another_feature
    return df_another_feature

Since features.yaml does not contain a task named get, OnlineDAG automatically identifies it as an “input task”. Finally, you must provide a “terminal task”, which is the last task in your online pipeline:

graph LR A[Input] --> B1[A feature] --> C[Terminal task] A --> B2[Another feature] --> C subgraph Feature engineering B1 B2 end

To implement this, create a subclass of OnlineDAG and provide the path to your features.yaml, parameters for your terminal task and the terminal task:

from ploomber import OnlineDAG

# subclass OnlineDAG...
class MyOnlineDAG(OnlineDAG):
    # and provide these three methods...

    # get_partial: returns a path to your feature tasks
    def get_partial():
        return 'tasks-features.yaml'

    # terminal_params: returns a dictionary with parameters for the terminal task
    def terminal_params():
        model = pickle.loads(resources.read_binary(ml_online, 'model.pickle'))
        return dict(model=model)

    # terminal_task: implementation of your terminal task
    def terminal_task(upstream, model):
        # receives all tasks with no downtream dependencies in
        # tasks-features.yaml
        a_feature = upstream['a_feature']
        another_feature = upstream['another_feature']
        X = pd.DataFrame({'a_feature': a_feature,
                          'anoter_feature': anoter_feature})
        return model.predict(X)

To call MyOnlineDAG:

from my_project import MyOnlineDAG

dag = MyOnlineDAG()

# pass parameters (one per input)
prediction = dag.predict(get=input_data)

You can import and call MyOnlineDAG in any framework (e.g., Flask) to expose your pipeline as an online service.

from flask import Flask, request, jsonify
import pandas as pd

from my_project import OnlineDAG

# instantiate online dag
dag = OnlineDAG()
app = Flask(__name__)

@app.route('/', methods=['POST'])
def predict():
    request_data = request.get_json()
    # get JSON data and create a data frame with a single row
    input_data = pd.DataFrame(request_data, index=[0])
    # pass input data, argument per root node
    out = pipeline.predict(get=input_data)
    # return output from the terminal task
    return jsonify({'prediction': int(out['terminal'])})


Click here to see a deployment example using AWS Lambda.

Click here to see a complete sample project that trains a model and exposes an API via Flask.