General
Stream Dataset Online

Stream Dataset to Deployed Models

Open In Colab (opens in a new tab)

This notebook demonstrates how to upload data to an already registered dataset with a deployed model.

Import necessary libraries

    import turboml as tb
    import pandas as pd
    import requests
    import time
    
    from tqdm.notebook import tqdm
    from utils.nb_utils import do_retry, simulate_realtime_stream

Inspecting Data

    transactions_df = pd.read_csv("data/transactions.csv")
    labels_df = pd.read_csv("data/labels.csv")

We will only use a subset of the dataset for initial model deployment.

    sub_transactions_df = transactions_df.iloc[0:20000]
    sub_transactions_df = sub_transactions_df.reset_index()
    sub_transactions_df
    sub_labels_df = labels_df.iloc[0:20000]
    sub_labels_df = sub_labels_df.reset_index()
    sub_labels_df
    sub_transactions_df.head()
    sub_labels_df.head()

Data Ingestion

    input_dataset_id = "transactions_stream_online"
    transactions = tb.PandasDataset(
        dataset_name=input_dataset_id,
        key_field="index",
        dataframe=sub_transactions_df,
        upload=True,
    )
    input_schema = transactions.schema
    label_dataset_id = "transaction_stream_labels"
    labels = tb.PandasDataset(
        dataset_name=label_dataset_id,
        key_field="index",
        dataframe=sub_labels_df,
        upload=True,
    )
    label_schema = labels.schema

Feature Engineering

Fetch data

    tb.get_features(dataset_id=input_dataset_id)

Add feature definitions

    transactions.feature_engineering.create_sql_features(
        sql_definition='"transactionAmount" + "localHour"',
        new_feature_name="my_sql_feat",
    )
    transactions.feature_engineering.get_local_features()
    tb.get_timestamp_formats()
    transactions.feature_engineering.register_timestamp(
        column_name="timestamp", format_type="epoch_seconds"
    )
    transactions.feature_engineering.create_aggregate_features(
        column_to_operate="transactionAmount",
        column_to_group="accountID",
        operation="SUM",
        new_feature_name="my_sum_feat",
        timestamp_column="timestamp",
        window_duration=24,
        window_unit="hours",
    )
    transactions.feature_engineering.get_local_features()

Submit feature definitions

    transactions.feature_engineering.materialize_features(["my_sql_feat", "my_sum_feat"])
    materialized_features = transactions.feature_engineering.get_materialized_features()
    materialized_features

Supervised Learning

    htc_model = tb.HoeffdingTreeClassifier(n_classes=2)
    numerical_fields = [
        "transactionAmount",
        "localHour",
        "my_sum_feat",
        "my_sql_feat",
    ]
    categorical_fields = [
        "digitalItemCount",
        "physicalItemCount",
        "isProxyIP",
    ]
    features = transactions.get_input_fields(
        numerical_fields=numerical_fields, categorical_fields=categorical_fields
    )
    label = labels.get_label_field(label_field="is_fraud")

Run Supervised ML jobs

We will deploy a HoeffdingTreeClassifier Model trained on a subset of our dataset.

    deployed_model_htc = htc_model.deploy(
        "demo_classifier_htc_stream_model", input=features, labels=label
    )
    outputs = do_retry(
        deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
    )
    outputs[-1]

Supervised Model Endpoints

    model_endpoints = deployed_model_htc.get_endpoints()
    model_endpoints
    model_query_datapoint = transactions_df.iloc[765].to_dict()
    model_query_datapoint
    resp = requests.post(
        model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
    )
    resp.json()

Supervised Model Evaluation

    deployed_model_htc.add_metric("WindowedAUC")
    model_auc_scores = do_retry(
        deployed_model_htc.get_evaluation,
        "WindowedAUC",
        return_on=(lambda result: len(result) > 0),
    )
    import matplotlib.pyplot as plt
    
    plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])

Upload to dataset with online model

We will upload data to the registered dataset, which will be used for training and inference by the respective deployed model in realtime.

We use a helper function simulate_realtime_stream from utils/nb_utils.py to simulate realtime streaming data from dataframe.

Upload using SDK

Here we use the upload_df method provided by the PandasDataset class to upload data to a registered dataset. This method internally uploads the data using the Arrow Flight Protocol over gRPC.

    sub_transactions_df = transactions_df.iloc[20000:100000]
    sub_transactions_df = sub_transactions_df.reset_index()
    sub_transactions_df
    sub_labels_df = labels_df.iloc[20000:100000]
    sub_labels_df = sub_labels_df.reset_index()
    sub_labels_df

Set the chunk size and delay for the simulate_realtime_stream helper function

    chunk_size = 10 * 1024
    delay = 0.3

Here we zip the two stream generators to get a batch of dataframe for input and label datasets and we upload them.

    realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
    realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
    
    with tqdm(
        total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
    ) as pbar:
        for input_stream, label_stream in zip(
            realtime_input_stream, realtime_label_stream, strict=True
        ):
            start = time.perf_counter()
            transactions.upload_df(dataframe=input_stream)
            labels.upload_df(dataframe=label_stream)
            end = time.perf_counter()
    
            pbar.update(len(input_stream))
            print(
                f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
            )

Check Updated Dataset and Model

    tb.get_features(dataset_id=input_dataset_id)

We can use the sync_features method to sync the materialized streaming features to the PandasDataset object.

    time.sleep(1)
    transactions.sync_features()

Calling get_materialized_features method will show that newly uploaded data is properly materialized.

    materialized_features = transactions.feature_engineering.get_materialized_features()
    materialized_features

The get_ouputs method will return the latest processed ouput.

    outputs = do_retry(
        deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
    )
    outputs[-1]
    model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
    print(len(model_auc_scores))
    plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
    resp = requests.post(
        model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
    )
    resp.json()

Upload using REST API

Here we use the dataset/dataset_id/upload REST API endpoint to upload data to a registered dataset. This endpoint will directly upload the data to the registered dataset kafka topic.

    sub_transactions_df = transactions_df.iloc[100000:170000]
    sub_transactions_df = sub_transactions_df.reset_index()
    sub_transactions_df
    sub_labels_df = labels_df.iloc[100000:170000]
    sub_labels_df = sub_labels_df.reset_index()
    sub_labels_df
    from turboml.common.api import api
    import json

We use the turboml api module to initiate the HTTP call, since auth is already configured for it.

    def rest_upload_df(dataset_id: str, df: pd.DataFrame):
        row_list = json.loads(df.to_json(orient="records"))
        api.post(f"dataset/{dataset_id}/upload", json=row_list)
    realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
    realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
    
    with tqdm(
        total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
    ) as pbar:
        for input_stream, label_stream in zip(
            realtime_input_stream, realtime_label_stream, strict=True
        ):
            start = time.perf_counter()
            rest_upload_df(input_dataset_id, input_stream)
            rest_upload_df(label_dataset_id, label_stream)
            end = time.perf_counter()
    
            pbar.update(len(input_stream))
            print(
                f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
            )

Check Updated Dataset and Model

    time.sleep(1)
    transactions.sync_features()
    materialized_features = transactions.feature_engineering.get_materialized_features()
    materialized_features
    outputs = do_retry(
        deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
    )
    outputs[-1]
    model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
    print(len(model_auc_scores))
    plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
    deployed_model_htc.get_inference(transactions_df.reset_index())

Upload using gRPC API

This example shows how to directly upload data to the registered dataset using Arrow Flight gRPC.

    sub_transactions_df = transactions_df.iloc[170000:]
    sub_transactions_df = sub_transactions_df.reset_index()
    sub_transactions_df
    sub_labels_df = labels_df.iloc[170000:]
    sub_labels_df = sub_labels_df.reset_index()
    sub_labels_df
    import pyarrow
    import struct
    import itertools
    from functools import partial
    from pyarrow.flight import FlightDescriptor
    
    from turboml.common.env import CONFIG as tb_config
    from turboml.common import get_protobuf_class, create_protobuf_from_row_tuple

Here we have defined a helper function write_batch to write pyarrow record batch given a pyarrow flight client instance.

    def write_batch(writer, df, proto_gen_partial_func):
        row_iter = df.itertuples(index=False, name=None)
        batch_size = 1024
        while True:
            batch = list(
                map(
                    proto_gen_partial_func,
                    itertools.islice(row_iter, batch_size),
                )
            )
    
            if not batch:
                break
    
            batch = pyarrow.RecordBatch.from_arrays([batch], ["value"])
            writer.write(batch)

We initiate connection for the pyarrow flight client to the TurboML arrow server with the required configs.

    arrow_server_grpc_endpoint = tb_config.EXTERNAL_ENDPOINT_TURBOML_ARROW_SERVER
    
    # Note: SchemaId prefix is required for proper kafka protobuf serialization.
    input_proto_gen_func = partial(
        create_protobuf_from_row_tuple,
        fields=sub_transactions_df.columns.tolist(),
        proto_cls=get_protobuf_class(input_dataset_id, input_schema.schema_body),
        prefix=struct.pack("!xIx", input_schema.id),
    )
    
    label_proto_gen_func = partial(
        create_protobuf_from_row_tuple,
        fields=sub_labels_df.columns.tolist(),
        proto_cls=get_protobuf_class(label_dataset_id, label_schema.schema_body),
        prefix=struct.pack("!xIx", label_schema.id),
    )
    
    client = pyarrow.flight.connect(arrow_server_grpc_endpoint)
    # Note: Expected arrow schema is a column named 'value' with serialized protobuf binary message.
    pa_schema = pyarrow.schema([("value", pyarrow.binary())])
    
    input_stream_writer, _ = client.do_put(
        FlightDescriptor.for_command(f"produce:{input_dataset_id}"),
        pa_schema,
        options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
    )
    
    label_stream_writer, _ = client.do_put(
        FlightDescriptor.for_command(f"produce:{label_dataset_id}"),
        pa_schema,
        options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
    )

Now, we use the stream generator and pass the data to the write_batch function along with pyarrow client write handler for for both input and label data writes respectively.

    realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
    realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
    
    with tqdm(
        total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
    ) as pbar:
        for input_stream, label_stream in zip(
            realtime_input_stream, realtime_label_stream, strict=True
        ):
            start = time.perf_counter()
            write_batch(input_stream_writer, input_stream, input_proto_gen_func)
            write_batch(label_stream_writer, label_stream, label_proto_gen_func)
            end = time.perf_counter()
    
            pbar.update(len(input_stream))
            print(
                f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
            )

Close the pyarrow client write handlers.

    input_stream_writer.close()
    label_stream_writer.close()

Check Updated Dataset and Model

    time.sleep(1)
    transactions.sync_features()
    materialized_features = transactions.feature_engineering.get_materialized_features()
    materialized_features
    outputs = do_retry(
        deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
    )
    outputs[-1]
    model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
    print(len(model_auc_scores))
    plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])