Quickstart

Get up and running with an example.

If you are here just to see the code, you can find it on GitHub.

Overview

So you signed up, installed and now you probably just want to start creating pipelines - so let's get to it!

For our quickstart, we chose a public BigQuery table with census data of adult income levels. Here's a snapshot of the data:

age | workclass | functional_weight | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | income_bracket
-------+--------------+---------------------+-------------+-----------------+--------------------+-------------------+----------------+--------------------+--------+----------------+----------------+------------------+------------------+------------------
39 | Private | 297847 | 9th | 5 | Married-civ-spouse | Other-service | Wife | Black | Female | 3411 | 0 | 34 | United-States | <=50K
72 | Private | 74141 | 9th | 5 | Married-civ-spouse | Exec-managerial | Wife | Asian-Pac-Islander | Female | 0 | 0 | 48 | United-States | >50K
45 | Private | 178215 | 9th | 5 | Married-civ-spouse | Machine-op-inspct | Wife | White | Female | 0 | 0 | 40 | United-States | >50K

The column on the very right called income_bracket is our label, i.e., what we want to infer based on all the other columns.

We'll aim to get a validation accuracy of around 82%, quickly run multiple experiments on various pipelines and see how it's possible to save up to 80% preprocessing costs using caching.

Authenticate

You can authenticate with the Core Engine as follows:

Python SDK
CLI
Python SDK
client = cengine.Client(username='YOUREMAIL', password='YOURPASSWORD')
CLI
cengine auth login

Create a Google Cloud Platform provider

A provider links your own cloud infrastructure to your Core Engine account. Within the context of the quickstart, we will set up Google Cloud Platform (GCP) as a provider.

In order to set up a GCP provider, all you need to do is to provide the path to a Google Cloud Storage bucket and a Service Account .json file.

Please make sure the service account has the required permissions to ensure we can orchestrate pipelines properly.

Python SDK
CLI
Python SDK
# PROVIDER_NAME: name of your provider
# SERVICE_ACCOUNT: path to your service_account json file
# BUCKET_NAME: path to your google cloud storage bucket, gs://your_bucket
# Create a provider
my_provider = client.create_provider(name=PROVIDER_NAME,
provider_type='gcp',
args={'service_account': SERVICE_ACCOUNT,
'artifact_store': BUCKET_NAME})
CLI
# Create a provider
cengine provider create gcp PROVIDER_NAME \
--artifact_store=BUCKET_NAME \ # format: gs://bucket_name
--service_account=SERIVCE_ACCOUNT # format: /path/to/service_account.json
# List created providers
cengine provider list

Create a workspace

After you created your provider, the next step is to set up a workspace using the created provider:

Python SDK
CLI
Python SDK
# Create a workspace
active_workspace = client.create_workspace(name="HelloWorkspace",
provider_id=my_provider.id)
print(active_workspace)
CLI
# Create a workspace using the created provider
cengine workspace create HelloWorkspace PROVIDER_ID
# List your workspaces
cengine workspace list

Expected output:

Python SDK
CLI
Python SDK
created_at: 2020-10-14 15:51:36.073345+00:00
id: 028a8dbf3b364869867ad16f6f6d7c6c
metadatastore_id: 99e9f53f578b4313998fd898d640da19
name: HelloWorkspace
organization_id: c252a61f5a084efe8f541e305b6bd87d
provider_id: 3bb8eda1e0ba47e8a84ef502a11a66a7
user_ids: null
CLI
Selection | ID | Name | Provider
-------------+----------+-----------------+------------
* | 74e7b20d | HelloWorkspace | e2c36c16

Oh, look at that! You now successfully created a workspace!

For those of you, who follow the CLI side, you can go ahead and make it active. For the Python SDK side, we already have our active_workspace.

CLI
CLI
# All proceeding actions will be under the context of the active workspace
cengine workspace set WORKSPACE_ID

Create a datasource

A pipeline must have a datasource to consume from of course and for the quickstart, we have decided to use a public BigQuery table with census data of adult income levels. In order to create this datasource in the Core Engine, you can follow:

Python SDK
CLI
Python SDK
new_datasource = client.create_datasource(name='QuickstartDataset',
provider_id=my_provider.id,
source='bq',
type='tabular',
args={"dataset": "ml_datasets",
"table": "census_adult_income",
"project": "bigquery-public-data"})
print(new_datasource)
CLI
# Create the datasource
cengine datasource create QuickstartDataset tabular bq PROVIDER_ID \
--project="project": "bigquery-public-data" \
--dataset="ml_datasets" \
--table="census_adult_income"
# List your datasources
cengine datasource list

Expected output:

Python SDK
CLI
Python SDK
args:
dataset: ml_datasets
project: bigquery-public-data
table: census_adult_income
created_at: 2020-10-14 15:52:03.655552+00:00
datasource_commits: []
id: 2dc50381002345849ead5ae524a7d710
metadatastore_id: cb085591789e459f9d8326bcc20fff02
name: QuickstartDataset
organization_id: c252a61f5a084efe8f541e305b6bd87d
origin_pipeline_id: 1cd04261dd934bf3ba7892a3ce729447
provider_id: 3bb8eda1e0ba47e8a84ef502a11a66a7
source: bq
type: tabular
CLI
Selection | ID | Name | Type | # Commits | Latest Commit Status | Latest Commit Date
-------------+---------------+-----------------------+---------+-------------+------------------------+----------------------
| DATASOURCE_ID | QuickstartDataset | tabular | 0 | No Commit | No commit

As can be seen in the output, we have created the source, but it has no commits yet. In order to use it in your pipeline, you need to commit it and create a snapshot of your data. Commits version control your datasources so that you never lose track of which data flows through your system at any moment in time.

Python SDK
CLI
Python SDK
# Create a commit of your datasource
new_datasource_commit = client.commit_datasource(
datasource_id=new_datasource.id)
print(new_datasource_commit)
CLI
# Commit the datasource
cengine datasource commit DATASOURCE_ID
# List your commits
cengine datasource commits DATASOURCE_ID

Expected output:

Python SDK
CLI
Python SDK
created_at: 2020-10-14 15:52:16.606698+00:00
datasource_id: 2dc50381002345849ead5ae524a7d710
destination_args:
dataset: betaorganizationgmbh
project: maiot-pgo
table: quickstartdataset_006d7bbe78fa47efb45c610edd889d5e
id: 006d7bbe78fa47efb45c610edd889d5e
message: Latest commit of QuickstartDataset
n_bytes: null
n_datapoints: null
n_features: null
used_schema: {}
user_id: 35bfe9a51f4a4073b6b2bf3d2ed8c9a8
CLI
Selection | ID | Created At | Status | Message | Bytes | # Datapoints | # Features
-------------+----------+---------------------+----------+------------------------------------+---------+----------------+--------------
| bedcfe1e | 2020-10-14 15:19:52 | Running | Latest commit of QuickstartDataset | | |

Committing the census dataset usually takes up a few (5-6) minutes. You can check whether its Running or Succeeded by doing cengine datasource commits DATASOURCE_ID and checking the status column.

Similar to the workspace, you can set a commit as an active commit over the CLI. (Again on the Python SDK, we have the new_datasource_commit):

CLI
CLI
cengine datasource set DATASOURCE_ID:COMMIT_ID

Moreover, you can also peek at the datasource commit as follows:

Python SDK
CLI
Python SDK
sample = client.peek_datasource_commit(new_datasource.id,
new_datasource_commit.id)
print(sample)
CLI
cengine datasource peek DATASOURCE_ID[:COMMIT_ID]

This will return 10 randomly sampled data points from the BigQuery table.

Create a pipeline

The first thing that you need when you create a pipeline is a configuration. This can come in the form of a simple .yaml file (to be used on the CLI) or an instance of the cengine.PipelineConfiguration (to be used on the Python SDK).

In order to speed things up, the Core Engine allows you to create a template configuration based on a dataset.

Python SDK
CLI
Python SDK
from cengine import PipelineConfig
c = PipelineConfig.from_datasource(client=client,
datasource_id=new_datasource.id,
commit_id=new_datasource_commit.id)
CLI
# Generate the template.config.yaml
cengine pipeline template DATASOURCE_ID[:COMMIT_ID]

Context-wise, the configuration covers the entire flow in an end-to-end machine learning pipeline, from data splitting to model serving. For the sake of simplicity, we will briefly take a look at how to handle a configuration in one big swoop.

Python SDK
CLI
Python SDK
from cengine import PipelineConfig
# Start with a template
c = PipelineConfig.from_datasource(client=client,
datasource_id=new_datasource.id,
commit_id=new_datasource_commit.id)
# Configure you dataset split
c.split.categorize.by = 'marital_status'
c.split.ratio = {'train': 0.8, 'eval': 0.2}
# Configure non-default preprocessing with a built-in method
c.features['education_num'].transform.add_methods(
{'method':'compute_and_apply_vocabulary'})
# Configure your labels
del c.features.income_bracket
c.labels.add(['income_bracket'])
# Configure your evaluation
del c.features.native_country
c.evaluator.slices = [['native_country']]
c.evaluator.metrics = ['binary_accuracy']
# Configure your training with a built-in model
c.trainer.fn = 'feedforward'
c.trainer.params = {'epochs': 25,
'input_units': 13,
'output_units': 1,
'loss': 'binary_crossentropy',
'metrics': ['binary_accuracy'],
'batch_size': 16,
'lr': 0.0005}
CLI
example_config.yaml
# Version
version: 1
# Dataset Split
split:
categorize:
by: marital_status
ratio:
eval: 0.2
train: 0.8
# Feature Selection
features:
age: {}
capital_gain: {}
capital_loss: {}
education: {}
education_num:
transform:
- method: compute_and_apply_vocabulary
parameters: {}
functional_weight: {}
hours_per_week: {}
marital_status: {}
occupation: {}
race: {}
relationship: {}
sex: {}
workclass: {}
# Label Selection
labels:
income_bracket: {}
# Training
trainer:
fn: feedforward
params:
epochs: 15
eval_steps: 10
input_units: 13
output_units: 1
train_steps: 500
# Evaluation
evaluator:
metrics:
- binary_accuracy
slices:
- - native_country
# Default preprocessing behaviour based on dtype
preprocessing:
boolean:
filling:
- method: max
parameters: {}
label_tuning:
- method: no_tuning
parameters: {}
resampling:
- method: mode
parameters: {}
transform:
- method: no_transform
parameters: {}
float:
filling:
- method: max
parameters: {}
label_tuning:
- method: no_tuning
parameters: {}
resampling:
- method: mean
parameters: {}
transform:
- method: scale_to_z_score
parameters: {}
integer:
filling:
- method: max
parameters: {}
label_tuning:
- method: no_tuning
parameters: {}
resampling:
- method: mean
parameters: {}
transform:
- method: scale_to_z_score
parameters: {}
string:
filling:
- method: custom
parameters:
custom_value: ''
label_tuning:
- method: no_tuning
parameters: {}
resampling:
- method: mode
parameters: {}
transform:
- method: compute_and_apply_vocabulary
parameters: {}

Once you have your configuration ready, you can create your pipeline as follows:

Python SDK
CLI
Python SDK
first_pipeline = client.push_pipeline(name='QuickStartPipeline',
config=c,
workspace_id=active_workspace.id)
CLI
cengine pipeline push CONFIG_PATH \ # path to your config.yaml
PIPELINE_NAME # specified name for your pipeline

Run your first pipeline

Now you are ready to run your first pipeline. Go ahead and run it!

Python SDK
CLI
Python SDK
first_pipeline_run = client.train_pipeline(
pipeline_id=first_pipeline.id,
datasource_commit_id=new_datasource_commit.id)
CLI
cengine pipeline run PIPELINE_ID

You should see a success message with your chosen configuration. The Core Engine will select 1 worker at 1 cpu per worker for this pipeline, based on the size of the datasource connected. It will provision these resources in the cloud, connect automatically to the datasource, and create a machine learning pipeline to train a model.

Check pipeline status

You can always check your the status of your pipelines in a workspace through:

Python SDK
CLI
Python SDK
client.get_pipeline_status(workspace_id=active_workspace.id)
CLI
cengine pipeline status

You should see something like this:

Python SDK
CLI
Python SDK
{'4f90403cc81f4c619298b3762ebd6b05': [{
'RUN ID': 'c90817291193414386d5d336fe5c3a33',
'TYPE': 'training',
'STATUS': 'Running',
'DATASOURCE': 'QuickstartDataset_bedcfe1ec7854f93a5a0e4bdecefb0dc',
'DATAPOINTS': '26500',
'START TIME': '2020-10-14 16:37:16',
'DURATION': '00:04:03'}]}
CLI
Currently, the active workspace is:
Workspace Name: My workspsace
Workspace ID: 61acbcbc
Fetching pipeline(s). This might take a few seconds..
PIPELINE NAME: DemoPipeline_14_10_2020_16_37_04 PIPELINE ID: 4f90403c
---------------------------------------------------------------------
RUN ID TYPE STATUS DATASOURCE DATAPOINTS START TIME DURATION
c9081729 training Running QuickstartDataset_bedcfe1e 26500 2020-10-14 16:37:16 00:04:03

You'll notice that while the pipeline is Running, all the costs will be set to 0. That's because only fully completed and successful pipelines are charged to your billing.

If you don't want to run the command again and again, you can set a watch on it. macOS users can install watch with brew install watch

watch cengine pipeline status --pipeline_id PIPELINE_ID

As completion hits 100%, we would be able to see the results. However, as this might take 7-9 minutes or so, we can use the time to inspect the config file that generated this pipeline.

Check the statistics

If you want to dive into the statistics of the datasource to get more of a feel for the results, you can simply run:

Python SDK
CLI
Python SDK
client.get_statistics(pipeline_id=first_pipeline.id,
pipeline_run_id=first_pipeline_run.id,
magic=True)
CLI
cengine pipeline statistics PIPELINE_ID

Wait a few seconds and your preferred browser will open and display the statistics of this datasource. You will notice that you can view both train andeval splits. Statistics are tied to pipelines because each pipeline has exactly one datasource.

For more information about the statistics view, please click here.

See the results

Once your pipeline reaches a 100% completion (should take approximately 7-9 minutes), you can see the results of the model training by running:

Python SDK
CLI
Python SDK
client.evaluate_single_pipeline(pipeline_id=first_pipeline.id,
pipeline_run_id=first_pipeline_run.id,
magic=True)
CLI
cengine pipeline evaluate PIPELINE_ID

Woah, did your browser just open up to a Jupyter notebook? Trippy. If you run that pre-made notebooks blocks you'll see a few handy plugins to showcase results.

You will be able to see Tensorboard logs and Tensorflow Model Analysis (TFMA) results. Both of these combined will give a great overview of how we did!

We are able to see not only the overall accuracy (~78% if all went well), but also the accuracy across a slicing_column of our choice. A slicing_column is simply a feature in your data that you would like to see your metric (e.g. accuracy) across in the final evaluation. In the above example, by splitting our metric by the marital_status, we are able to identify that we did worst with married people with children! We should definitely try and improve our model for that segment of our data.

You can edit the last line of the TFMA notebook block to include your slicing metrics:

tfma.view.render_slicing_metrics(evaluation, slicing_column='marital_status')

There's a lot more to see here. Click here for more information regarding the evaluation results.

Download the model

Ok, so now you're happy with the results, and you want that trained model? Easy! Go ahead and execute:

Python SDK
CLI
Python SDK
import os
client.download_model(pipeline_id=first_pipeline.id,
pipeline_run_id=first_pipeline_run.id,
output_path=os.path.join(os.getcwd(), 'model'))
CLI
cengine pipeline model PIPELINE_ID --output_path /path/to/download/model/

This will download the model to the specified directory, as a Tensorflow Saved Model. For more information about the model format, please click here.

Iterate

Ok, so we got one result, but we shouldn't stop there, should we? Let's create more pipelines and iterate!

You can always edit an existing config.yaml file directly or modify your current cengine.PipelineConfiguration through the SDK to create a new pipeline. However, for this example, we will actually pull the configuration of an already registered pipeline and work on that configuration.

Python SDK
config.yaml
Python SDK
second_config = client.pull_pipeline(pipeline_id=first_pipeline.id)
second_config.trainer.params['batch_size'] = 32
config.yaml
trainer:
fn: feedforward
params:
loss: binary_crossentropy
metrics:
- accuracy
last_activation: sigmoid
n_output_units: 1
input_units: 12
train_steps: 2500
eval_steps: 1500

As you can see, we increased the batch_size from 16 to 32. Now you can register a new pipeline by running:

Python SDK
CLI
Python SDK
second_pipeline = client.push_pipeline(name='SecondPipeline',
config=second_config,
workspace_id=active_workspace.id)
CLI
cengine pipeline push /path/to/modified_out.yaml "NewPipeline"

This will push another pipeline to your workspace with the name NewPipeline. The pipeline will be automatically assigned a new id. You can now run this pipeline again as follows:

Python SDK
CLI
Python SDK
second_pipeline_run = client.train_pipeline(
pipeline_id=second_pipeline.id,
datasource_commit_id=new_datasource_commit.id)
CLI
cengine pipeline train PIPELINE_ID

Depending on what you change in the configuration, you will notice a marked speed improvement in subsequent pipelines in the same workspace. This is because the Core Engine heavily utilizes previously computed cached intermediate results to ensure that you do not repeat the same processing steps again. E.g. If you only change something in the trainer key in the YAML, only the training part will run again. This way, you can quickly iterate over many different machine learning models and arrive at the best one.

This difference might seem small at first, but boy does it scale! With the Core Engine you don't have to worry about repeating the same steps over and over. You just use the resources you need to compute precisely what you want to. Hyper-parameter tuning, running fast iterations and model comparison are quick, cheap and easy.

Conclusion

That was easy! Now you're ready to start making your own pipelines. To start, find out how to create a workspace, or add your own datasource, and finally make your own pipeline.