Introduction

What is TurboML?

TurboML is a machine learning platform that’s reinvented for real-time. What does that mean? All the steps in the ML lifecycle, from data ingestion, to feature engineering, to ML modelling to post deployment steps like monitoring, are all designed so that in addition to batch data, they can also handle real-time data.

Data Ingestion

The first step is to bring your data to the TurboML platform. There are two major ways to ingest your data. Pull-based and Push-based.

Pull-based ingestion

With this approach, you use TurboML’s prebuilt connectors to connect to your data source. The connectors will continuously pull data from your data source, and ingest it into TurboML.

Push-based ingestion

Sometimes, you might not want to send data via an intermediate data source, but rather directly send the data. Push-based ingestion can be used for this, where data can be send either via REST API calls, or using more performant client SDKs. Here’s an example with a Pandas DataFrame

transactions = tb.PandasDataset(dataset_name="transactions",dataframe=df, upload=True)
transactions.configure_dataset(key_field="index")

Feature Engineering

Feature engineering is perhaps the most important step for data scientists. TurboML provides several different interfaces to define features. We’ve designed the feature engineering experience in a way so that after you’ve defined a feature, you can see that feature computed for your local data. This should help debug and iterate faster. Once you’re confident about a feature definition, you can deploy it where it’ll be continuously computed on the real-time data. Once deployed, these features are automatically computed on the streaming data. And we have retrieval APIs to compute it for ad-hoc queries.

SQL Features

Writing SQL queries is one of the most common way to define ML features. TurboML supports writing arbitrary SQL expressions to enable such features. Here’s an example with a simple SQL feature.

transactions.feature_engineering.create_sql_features(
    sql_definition='"transactionAmount" + "localHour"',
    new_feature_name="my_sql_feat",
)

Notice that the column names are in quotes.

And here’s a more complicated example

transactions.feature_engineering.create_sql_features(
    sql_definition='CASE WHEN "paymentBillingCountryCode" <> "ipCountryCode" THEN 1 ELSE 0 END ',
    new_feature_name="country_code_match",
)

Aggregate Features

A common template for real-time features is aggregating some value over some time window. To define such time-windowed aggregations, you first need to register a timestamp column for your dataset. This can be done as follows,

transactions.feature_engineering.register_timestamp(column_name="timestamp", format_type="epoch_seconds")

The supported formats can be found out using

tb.get_timestamp_formats()

Once the timestamp is registered, we can create a feature using

transactions.feature_engineering.create_aggregate_features(
    column_to_operate="transactionAmount",
    column_to_group="accountID",
    operation="SUM",
    new_feature_name="my_sum_feat",
    timestamp_column="timestamp",
    window_duration=24,
    window_unit="hours"
)

User Defined Features

We understand why data scientists love Python - the simplicity, the ecosystem - is unmatchable. Guess what? You can use native Python, importing any library, to define features!

IBIS Features

For streaming features that are more complex than just windowed aggregations, can be defined using the ibis interface. They can then be executed using Apache Flink or RisingWave.

Feature Retrieval

As mentioned before, once deployed, the feature computation is automatically added to the real-time streaming pipeline. However, feature values can also be retrieved on ad-hoc data using the retrieval API. Here’s an example

features = tb.retrieve_features("transactions", query_df)

ML Modelling - Basic concepts

Inputs and Labels

For each model, we need to specify the Inputs and the Labels.

Types of fields

Different models can accept different types of input fields. The supported types of fields are, numeric, categoric, time series, text, and image.

TurboML algorithms

TurboML provides several algorithms out of the box. These algorithms are optimized for online predictions and learning, and have been tested on real-world settings.

model = tb.HoeffdingTreeClassifier(n_classes=2)

Pytorch/TensorFlow/Scikit-learn

We use ONNX to deploy trained models from Pytorch, TensorFlow, Scikit-learn or other ONNX compatible frameworks. Example for these three frameworks can be found in the following notebooks.

Note: These models are static, and are not updated automatically.

Python

TurboML also supports writing arbitrary Python code to define your own algorithms, including any libraries. To add your own algorithms, you need to define a Python class with 2 methods defined with the following signature:

class Model:
    def learn_one(self, features, label):
        pass
 
    def predict_one(self, features, output_data):
        pass

Examples of using an incremental learning algorithm, as well as a batch-like algorithm, can be found here from the river library.

Combining models

Models can also be combined to create other models, e.g. ensembles. An example of an ensemble model is as follows

model = tb.LeveragingBaggingClassifier(n_classes=2, base_model = tb.HoeffdingTreeClassifier(n_classes=2))

Preprocessors can also be chained and applied in a similar manner. E.g.

model = tb.MinMaxPreProcessor(base_model = model)

Model Training

Once we’ve defined a model, it can be trained in different ways.

Batch way

The simplest way is to train the model in a batch way. This is similar to sklearn’s fit() method. However, internally the training is performed in an incremental manner. So, you can update an already trained model on some new data too. Here’s an example

old_trained_model = model.learn(old_features, old_label)
new_trained_model = old_trained_model.learn(new_features, new_label)

Any trained copy of the model can be then deployed to production.

deployed_model = new_trained_model.deploy(name = "deployment_name", input=features, labels=label, predict_only=True)

Since this is a trained model, we can also invoke this model in a batch way to get predictions without actually deploying the mode.

outputs = new_trained_model.predict(query_features)

Streaming way

This is where the model, after deployment, is continuously trained on new data. The user can choose how to update the model. The choices are online updates (where the model is updated on every new datapoint), or trigger-based updates which can be volume-based, time-based, performance-based or drift-based. The default option is online updates.

deployed_model = model.deploy(name = "deployment_name", input=features, labels=label)

Deployment and MLOps

Inference

Once you’ve deployed a mode, there are several different ways to perform inference.

Async

The first one is the async method. The data that is streamed from the input source is continuously fed to the model, and the outputs are streamed to another source. This stream can be either be subscribed to directly be the end user application, or sinked to a database or other data sources.

outputs = deployed_model.get_outputs()

API

A request-response model is used for inference on a single data point synchronously. The /model_name/predict endpoint is exposed for each deployed model where a REST API call can be made to get the outputs.

Batch

When you have multiple records you’d like to perform inference on, you can use the get_inference method as follows.

outputs = deployed_model.get_inference(query_df)

Evaluation

TurboML provides standard ML metrics out of the box to perform model evaluation. Multiple metrics can be registered for any deployed model. The metrics pipeline re-uses the labels extracted for model training.

deployed_model.add_metric("WindowedAUC")
model_auc_scores = deployed_model.get_evaluation("WindowedAUC")