Stream Dataset to Deployed Models
This notebook demonstrates how to upload data to an already registered dataset with a deployed model.
Import necessary libraries
import turboml as tb
import pandas as pd
import requests
import time
from tqdm.notebook import tqdm
from utils.nb_utils import do_retry, simulate_realtime_stream
Inspecting Data
transactions_df = pd.read_csv("data/transactions.csv")
labels_df = pd.read_csv("data/labels.csv")
We will only use a subset of the dataset for initial model deployment.
sub_transactions_df = transactions_df.iloc[0:20000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[0:20000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
sub_transactions_df.head()
sub_labels_df.head()
Data Ingestion
input_dataset_id = "transactions_stream_online"
transactions = tb.PandasDataset(
dataset_name=input_dataset_id,
key_field="index",
dataframe=sub_transactions_df,
upload=True,
)
input_schema = transactions.schema
label_dataset_id = "transaction_stream_labels"
labels = tb.PandasDataset(
dataset_name=label_dataset_id,
key_field="index",
dataframe=sub_labels_df,
upload=True,
)
label_schema = labels.schema
Feature Engineering
Fetch data
tb.get_features(dataset_id=input_dataset_id)
Add feature definitions
transactions.feature_engineering.create_sql_features(
sql_definition='"transactionAmount" + "localHour"',
new_feature_name="my_sql_feat",
)
transactions.feature_engineering.get_local_features()
tb.get_timestamp_formats()
transactions.feature_engineering.register_timestamp(
column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_aggregate_features(
column_to_operate="transactionAmount",
column_to_group="accountID",
operation="SUM",
new_feature_name="my_sum_feat",
timestamp_column="timestamp",
window_duration=24,
window_unit="hours",
)
transactions.feature_engineering.get_local_features()
Submit feature definitions
transactions.feature_engineering.materialize_features(["my_sql_feat", "my_sum_feat"])
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
Supervised Learning
htc_model = tb.HoeffdingTreeClassifier(n_classes=2)
numerical_fields = [
"transactionAmount",
"localHour",
"my_sum_feat",
"my_sql_feat",
]
categorical_fields = [
"digitalItemCount",
"physicalItemCount",
"isProxyIP",
]
features = transactions.get_input_fields(
numerical_fields=numerical_fields, categorical_fields=categorical_fields
)
label = labels.get_label_field(label_field="is_fraud")
Run Supervised ML jobs
We will deploy a HoeffdingTreeClassifier Model trained on a subset of our dataset.
deployed_model_htc = htc_model.deploy(
"demo_classifier_htc_stream_model", input=features, labels=label
)
outputs = do_retry(
deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
Supervised Model Endpoints
model_endpoints = deployed_model_htc.get_endpoints()
model_endpoints
model_query_datapoint = transactions_df.iloc[765].to_dict()
model_query_datapoint
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
Supervised Model Evaluation
deployed_model_htc.add_metric("WindowedAUC")
model_auc_scores = do_retry(
deployed_model_htc.get_evaluation,
"WindowedAUC",
return_on=(lambda result: len(result) > 0),
)
import matplotlib.pyplot as plt
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
Upload to dataset with online model
We will upload data to the registered dataset, which will be used for training and inference by the respective deployed model in realtime.
We use a helper function simulate_realtime_stream
from utils/nb_utils.py
to simulate realtime streaming data from dataframe.
Upload using SDK
Here we use the upload_df method provided by the PandasDataset class to upload data to a registered dataset. This method internally uploads the data using the Arrow Flight Protocol over gRPC.
sub_transactions_df = transactions_df.iloc[20000:100000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[20000:100000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
Set the chunk size and delay for the simulate_realtime_stream
helper function
chunk_size = 10 * 1024
delay = 0.3
Here we zip the two stream generators to get a batch of dataframe for input and label datasets and we upload them.
realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
with tqdm(
total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
for input_stream, label_stream in zip(
realtime_input_stream, realtime_label_stream, strict=True
):
start = time.perf_counter()
transactions.upload_df(dataframe=input_stream)
labels.upload_df(dataframe=label_stream)
end = time.perf_counter()
pbar.update(len(input_stream))
print(
f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
)
Check Updated Dataset and Model
tb.get_features(dataset_id=input_dataset_id)
We can use the sync_features method to sync the materialized streaming features to the PandasDataset object.
time.sleep(1)
transactions.sync_features()
Calling get_materialized_features method will show that newly uploaded data is properly materialized.
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
The get_ouputs method will return the latest processed ouput.
outputs = do_retry(
deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
Upload using REST API
Here we use the dataset/dataset_id/upload REST API endpoint to upload data to a registered dataset. This endpoint will directly upload the data to the registered dataset kafka topic.
sub_transactions_df = transactions_df.iloc[100000:170000]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[100000:170000]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
from turboml.common.api import api
import json
We use the turboml api module to initiate the HTTP call, since auth is already configured for it.
def rest_upload_df(dataset_id: str, df: pd.DataFrame):
row_list = json.loads(df.to_json(orient="records"))
api.post(f"dataset/{dataset_id}/upload", json=row_list)
realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
with tqdm(
total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
for input_stream, label_stream in zip(
realtime_input_stream, realtime_label_stream, strict=True
):
start = time.perf_counter()
rest_upload_df(input_dataset_id, input_stream)
rest_upload_df(label_dataset_id, label_stream)
end = time.perf_counter()
pbar.update(len(input_stream))
print(
f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
)
Check Updated Dataset and Model
time.sleep(1)
transactions.sync_features()
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
outputs = do_retry(
deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
deployed_model_htc.get_inference(transactions_df.reset_index())
Upload using gRPC API
This example shows how to directly upload data to the registered dataset using Arrow Flight gRPC.
sub_transactions_df = transactions_df.iloc[170000:]
sub_transactions_df = sub_transactions_df.reset_index()
sub_transactions_df
sub_labels_df = labels_df.iloc[170000:]
sub_labels_df = sub_labels_df.reset_index()
sub_labels_df
import pyarrow
import struct
import itertools
from functools import partial
from pyarrow.flight import FlightDescriptor
from turboml.common.env import CONFIG as tb_config
from turboml.common import get_protobuf_class, create_protobuf_from_row_tuple
Here we have defined a helper function write_batch
to write pyarrow record batch given a pyarrow flight client instance.
def write_batch(writer, df, proto_gen_partial_func):
row_iter = df.itertuples(index=False, name=None)
batch_size = 1024
while True:
batch = list(
map(
proto_gen_partial_func,
itertools.islice(row_iter, batch_size),
)
)
if not batch:
break
batch = pyarrow.RecordBatch.from_arrays([batch], ["value"])
writer.write(batch)
We initiate connection for the pyarrow flight client to the TurboML arrow server with the required configs.
arrow_server_grpc_endpoint = tb_config.EXTERNAL_ENDPOINT_TURBOML_ARROW_SERVER
# Note: SchemaId prefix is required for proper kafka protobuf serialization.
input_proto_gen_func = partial(
create_protobuf_from_row_tuple,
fields=sub_transactions_df.columns.tolist(),
proto_cls=get_protobuf_class(input_dataset_id, input_schema.schema_body),
prefix=struct.pack("!xIx", input_schema.id),
)
label_proto_gen_func = partial(
create_protobuf_from_row_tuple,
fields=sub_labels_df.columns.tolist(),
proto_cls=get_protobuf_class(label_dataset_id, label_schema.schema_body),
prefix=struct.pack("!xIx", label_schema.id),
)
client = pyarrow.flight.connect(arrow_server_grpc_endpoint)
# Note: Expected arrow schema is a column named 'value' with serialized protobuf binary message.
pa_schema = pyarrow.schema([("value", pyarrow.binary())])
input_stream_writer, _ = client.do_put(
FlightDescriptor.for_command(f"produce:{input_dataset_id}"),
pa_schema,
options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
)
label_stream_writer, _ = client.do_put(
FlightDescriptor.for_command(f"produce:{label_dataset_id}"),
pa_schema,
options=pyarrow.flight.FlightCallOptions(headers=api.arrow_headers),
)
Now, we use the stream generator and pass the data to the write_batch
function along with pyarrow client write handler for for both input and label data writes respectively.
realtime_input_stream = simulate_realtime_stream(sub_transactions_df, chunk_size, delay)
realtime_label_stream = simulate_realtime_stream(sub_labels_df, chunk_size, delay)
with tqdm(
total=len(sub_transactions_df), desc="Progress", unit="rows", unit_scale=True
) as pbar:
for input_stream, label_stream in zip(
realtime_input_stream, realtime_label_stream, strict=True
):
start = time.perf_counter()
write_batch(input_stream_writer, input_stream, input_proto_gen_func)
write_batch(label_stream_writer, label_stream, label_proto_gen_func)
end = time.perf_counter()
pbar.update(len(input_stream))
print(
f"# Uploaded {len(input_stream)} input, label rows for processing in {end - start:.6f} seconds."
)
Close the pyarrow client write handlers.
input_stream_writer.close()
label_stream_writer.close()
Check Updated Dataset and Model
time.sleep(1)
transactions.sync_features()
materialized_features = transactions.feature_engineering.get_materialized_features()
materialized_features
outputs = do_retry(
deployed_model_htc.get_outputs, return_on=(lambda result: len(result) > 0)
)
outputs[-1]
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
print(len(model_auc_scores))
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])