General
Stream Dataset Online

Stream Dataset to Deployed Models

Open In Colab (opens in a new tab)

This notebook demonstrates how to upload data to an already registered dataset with a deployed model.

Import necessary libraries

import pandas as pd
import turboml as tb
import requests
import time
 
from tqdm.notebook import tqdm
from utils.nb_utils import do_retry, simulate_realtime_stream

Inspecting Data

transactions_df = pd.read_csv("data/transactions.csv")
labels_df = pd.read_csv("data/labels.csv")

We will only use a subset of the dataset for initial model deployment.

sub_transactions_df = transactions_df.iloc[0:20000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[0:20000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
sub_transactions_df.head()
sub_labels_df.head()

Data Ingestion

input_dataset_id = "transactions_stream_online"
# Attempt to create and upload dataset
try:
    transactions = tb.PandasDataset(
        dataset_name=input_dataset_id,
        key_field="index",
        dataframe=sub_transactions_df,
        upload=True,
    )
except:
    # If it already exists, just retrieve the existing dataset
    transactions = tb.PandasDataset(dataset_name=input_dataset_id)
input_schema = transactions.schema
label_dataset_id = "transaction_stream_labels"
try:
    labels = tb.PandasDataset(
        dataset_name=label_dataset_id,
        key_field="index",
        dataframe=sub_labels_df,
        upload=True,
    )
except:
    labels = tb.PandasDataset(dataset_name=label_dataset_id)
label_schema = labels.schema

Feature Engineering

Fetch data

tb.get_features(dataset_id=input_dataset_id)

Add feature definitions

transactions.feature_engineering.create_sql_features(
    sql_definition='"transactionAmount" + "localHour"',
    new_feature_name="my_sql_feat",
)
transactions.feature_engineering.get_local_features()
tb.get_timestamp_formats()
transactions.feature_engineering.register_timestamp(
    column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_aggregate_features(
    column_to_operate="transactionAmount",
    column_to_group="accountID",
    operation="SUM",
    new_feature_name="my_sum_feat",
    timestamp_column="timestamp",
    window_duration=24,
    window_unit="hours",
)
transactions.feature_engineering.get_local_features()

Submit feature definitions

transactions.feature_engineering.materialize_features(["my_sql_feat", "my_sum_feat"])
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features

Supervised Learning

htc_model = tb.HoeffdingTreeClassifier(n_classes=2)
numerical_fields = [
    "transactionAmount",
    "localHour",
    "my_sum_feat",
    "my_sql_feat",
]
categorical_fields = [
    "digitalItemCount",
    "physicalItemCount",
    "isProxyIP",
]
features = transactions.get_input_fields(
    numerical_fields=numerical_fields, categorical_fields=categorical_fields
)
label = labels.get_label_field(label_field="is_fraud")

Run Supervised ML jobs

We will deploy a HoeffdingTreeClassifier Model trained on a subset of our dataset.

deployed_model_htc = htc_model.deploy(
    "demo_classifier_htc_stream_model", input=features, labels=label
)
outputs = do_retry(
    deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]

Supervised Model Endpoints

model_endpoints = deployed_model_htc.get_endpoints()
model_endpoints
model_query_datapoint = transactions_df.iloc[765].to_dict()
model_query_datapoint
resp = requests.post(
    model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()

Supervised Model Evaluation

deployed_model_htc.add_metric("WindowedAUC")
model_auc_scores = do_retry(
    deployed_model_htc.get_evaluation,
    "WindowedAUC",
    return_on=(lambda result: len(result) > 0),
)
import matplotlib.pyplot as plt
 
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])

Upload to dataset with online model

We will upload data to the registered dataset, which will be used for training and inference by the respective deployed model in realtime.

We use a helper function simulate_realtime_stream from utils/nb_utils.py to simulate realtime streaming data from dataframe.

Upload using SDK

Here we use the upload_df method provided by the PandasDataset class to upload data to a registered dataset. This method internally uploads the data using the Arrow Flight Protocol over gRPC.

sub_transactions_df = transactions_df.iloc[20000:100000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[20000:100000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df

Set the chunk size and delay for the simulate_realtime_stream helper function

chunk_size = 10 * 1024
delay = 0.1

Here we zip the two stream generators to get a batch of dataframe for input and label datasets and we upload them.

realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
 
with tqdm(
    total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
    for input_stream, label_stream in zip(
        realtime_input_stream, realtime_label_stream, strict=True
    ):
        start = time.perf_counter()
        transactions.upload_df(dataframe=input_stream)
        labels.upload_df(dataframe=label_stream)
        end = time.perf_counter()
 
        pbar.update(len(input_stream))
        print(
            f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
        )

Check Updated Dataset and Model

tb.get_features(dataset_id=input_dataset_id)

We can use the sync_features method to sync the materialized streaming features to the PandasDataset object.

time.sleep(1)
transactions.sync_features()

Calling get_materialized_features method will show that newly uploaded data is properly materialized.

materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features

The get_ouputs method will return the latest processed ouput.

outputs = do_retry(
    deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
resp = requests.post(
    model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()

Upload using REST API

Here we use the dataset/dataset_id/upload REST API endpoint to upload data to a registered dataset. This endpoint will directly upload the data to the registered dataset kafka topic.

sub_transactions_df = transactions_df.iloc[100000:170000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[100000:170000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
from turboml.common.api import api
import json

We use the turboml api module to initiate the HTTP call, since auth is already configured for it.

def rest_upload_df(dataset_id: str, df: pd.DataFrame):
    row_list = json.loads(df.to_json(orient="records"))
    api.post(f"dataset/{dataset_id}/upload", json=row_list)
realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
 
with tqdm(
    total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
    for input_stream, label_stream in zip(
        realtime_input_stream, realtime_label_stream, strict=True
    ):
        start = time.perf_counter()
        rest_upload_df(input_dataset_id, input_stream)
        rest_upload_df(label_dataset_id, label_stream)
        end = time.perf_counter()
 
        pbar.update(len(input_stream))
        print(
            f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
        )

Check Updated Dataset and Model

time.sleep(1)
transactions.sync_features()
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
outputs = do_retry(
    deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
deployed_model_htc.get_inference(transactions_df.reset_index())

Upload using gRPC API

This example shows how to directly upload data to the registered dataset using Arrow Flight gRPC.

sub_transactions_df = transactions_df.iloc[170000:]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[170000:]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
import pyarrow
import struct
import itertools
from functools import partial
from pyarrow.flight import FlightDescriptor
 
from turboml.common.env import CONFIG as tb_config
from turboml.common import get_protobuf_class, create_protobuf_from_row_tuple

Here we have defined a helper function write_batch to write pyarrow record batch given a pyarrow flight client instance.

def write_batch(writer, df, proto_gen_partial_func):
    row_iter = df.itertuples(index=False, name=None)
    batch_size = 1024
    while True:
        batch = list(
            map(
                proto_gen_partial_func,
                itertools.islice(row_iter, batch_size),
            )
        )
 
        if not batch:
            break
 
        batch = pyarrow.RecordBatch.from_arrays([batch], ["value"])
        writer.write(batch)

We initiate connection for the pyarrow flight client to the TurboML arrow server with the required configs.

arrow_server_grpc_endpoint = tb_config.ARROW_SERVER_ADDRESS
 
# Note: SchemaId prefix is required for proper kafka protobuf serialization.
input_proto_gen_func = partial(
    create_protobuf_from_row_tuple,
    fields=sub_transactions_df.columns.tolist(),
    proto_cls=get_protobuf_class(input_dataset_id, input_schema.schema_body),
    prefix=struct.pack("!xIx", input_schema.id),
)
 
label_proto_gen_func = partial(
    create_protobuf_from_row_tuple,
    fields=sub_labels_df.columns.tolist(),
    proto_cls=get_protobuf_class(label_dataset_id, label_schema.schema_body),
    prefix=struct.pack("!xIx", label_schema.id),
)
 
client = pyarrow.flight.connect(arrow_server_grpc_endpoint)
# Note: Expected arrow schema is a column named 'value' with serialized protobuf binary message.
pa_schema = pyarrow.schema([("value", pyarrow.binary())])
 
input_stream_writer, _ = client.do_put(
    FlightDescriptor.for_command(f"produce:{input_dataset_id}"),
    pa_schema,
    options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
)
 
label_stream_writer, _ = client.do_put(
    FlightDescriptor.for_command(f"produce:{label_dataset_id}"),
    pa_schema,
    options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
)

Now, we use the stream generator and pass the data to the write_batch function along with pyarrow client write handler for for both input and label data writes respectively.

realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
 
with tqdm(
    total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
    for input_stream, label_stream in zip(
        realtime_input_stream, realtime_label_stream, strict=True
    ):
        start = time.perf_counter()
        write_batch(input_stream_writer, input_stream, input_proto_gen_func)
        write_batch(label_stream_writer, label_stream, label_proto_gen_func)
        end = time.perf_counter()
 
        pbar.update(len(input_stream))
        print(
            f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
        )

Close the pyarrow client write handlers.

input_stream_writer.close()
label_stream_writer.close()

Check Updated Dataset and Model

time.sleep(1)
transactions.sync_features()
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
outputs = do_retry(
    deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])