orchestration#
Pipeline Orchestration with Prefect
Divina pipelines have built-in orchestration via Prefect, meaning that by only setting a keyword argument to True, fitting and predicting with your Divina pipelines store, track and visualize each incremental task and artifact passed between tasks. Combined with the easy configuration of the underlying Dask cluster, this means your pipelines can be taken into production extremely easily.
Below is an example of fitting and predicting with a Divina pipeline using the built-in Prefect orchestration (Please be sure to use the appropriate environment variables to point and authenticate to your Prefect service).
import dask.dataframe as dd
import pandas as pd
from prefect import flow
from divina import Divina
@flow(name="divina_example_pipeline", persist_result=True)
def run_pipeline(pipeline, data):
pipeline.fit(data, prefect=True)
return example_pipeline.predict(
data.drop(columns=pipeline.target), prefect=True
)
example_data = pd.DataFrame(
data=[
["2011-01-01", 3, 6],
["2011-01-02", 2, 4],
["2011-01-03", 8, 6],
["2011-01-04", 1, 1],
["2011-01-05", 2, 3],
],
columns=["a", "b", "c"],
)
example_data_dask = dd.from_pandas(example_data, npartitions=1)
example_pipeline = Divina(target="c", time_index="a", frequency="D")
print(run_pipeline(example_pipeline, example_data_dask))
Artifact Persistence
In order to persist pipeline artifacts (datasets, models and metrics), one must only set the test_pipeline_root attribute of their Divina pipeline.
Below is an example of artifact persistence to a local path:
import dask.dataframe as dd
import pandas as pd
from prefect import flow
from divina import Divina
@flow(name="divina_example_pipeline", persist_result=True)
def run_pipeline(pipeline, data):
pipeline.fit(data, prefect=True)
return example_pipeline.predict(
data.drop(columns=pipeline.target), prefect=True
)
example_data = pd.DataFrame(
data=[
["2011-01-01", 3, 6],
["2011-01-02", 2, 4],
["2011-01-03", 8, 6],
["2011-01-04", 1, 1],
["2011-01-05", 2, 3],
],
columns=["a", "b", "c"],
)
example_data_dask = dd.from_pandas(example_data, npartitions=1)
example_pipeline = Divina(
target="c",
time_index="a",
frequency="D",
pipeline_root="divina_example/example_pipeline",
)
print(run_pipeline(example_pipeline, example_data_dask))
Below is an example of artifact persistence to S3 (be sure to set the appropriate credentials via environment variable):
import dask.dataframe as dd
import pandas as pd
from prefect import flow
from divina import Divina
@flow(name="divina_example_pipeline", persist_result=True)
def run_pipeline(pipeline, data):
pipeline.fit(data, prefect=True)
return example_pipeline.predict(
data.drop(columns=pipeline.target), prefect=True
)
example_data = pd.DataFrame(
data=[
["2011-01-01", 3, 6],
["2011-01-02", 2, 4],
["2011-01-03", 8, 6],
["2011-01-04", 1, 1],
["2011-01-05", 2, 3],
],
columns=["a", "b", "c"],
)
example_data_dask = dd.from_pandas(example_data, npartitions=1)
example_pipeline = Divina(
target="c",
time_index="a",
frequency="D",
pipeline_root="s3://divina-example-bucket/example_pipeline",
)
print(run_pipeline(example_pipeline, example_data_dask))