TurboML Quickstart
import pandas as pd
import turboml as tb
Inspecting Data
TurboML is built for real-time machine learning, and as such, deals with streams of data. This can be achieved by using connectors to continuously pull data from your data source (like S3 or postgres), or use push-based approaches using REST API or Client SDKs.
For the purpose of this tutorial, we can use simulate real-time data generation, with a batch-like setting using pandas dataframes. Let's first load some pandas dataframes. In this example, we're using a credit card fraud detection dataset.
transactions_df = pd.read_csv("data/transactions.csv")
labels_df = pd.read_csv("data/labels.csv")
transactions_df
labels_df
Our dataset has 201406 datapoints, each with a corresponding label. Since we don't have a natural primary key in the dataset that can uniquely identify each row, we'll use the inbuilt index that pandas provides.
transactions_df = transactions_df.reset_index()
labels_df = labels_df.reset_index()
transactions_df.head()
labels_df.head()
Data Ingestion
We can now upload these dataframes to the TurboML platform. The PandasDataset class can be used here. It takes in the dataframe, the primary key, and the name of the dataset that is to be created for the given dataframe as input.
# Attempt to create and upload dataset
try:
transactions = tb.PandasDataset(
dataset_name="transactions",
key_field="index",
dataframe=transactions_df,
upload=True,
)
except:
# If it already exists, just retrieve the existing dataset
transactions = tb.PandasDataset(dataset_name="transactions")
try:
labels = tb.PandasDataset(
dataset_name="transaction_labels",
key_field="index",
dataframe=labels_df,
upload=True,
)
except:
labels = tb.PandasDataset(dataset_name="transaction_labels")
Feature Engineering
TurboML platform facilitates transformations on raw data to produce new features. You can use the jupyter notebook as a "playground" to explore different features. This involves 3 steps.
- fetch data: Experimentation is easier on static data. Since TurboML works with continuous data streams, to enable experimentation we fetch a snapshot or a subset of data in the jupyter notebook.
- add feature definitions: Now that we have a static dataset, we can define multiple different features, and see their values on this dataset. Since we can observe their values, we can perform simple experiments and validations like correlations, plots and other exploratory analysis.
- submit feature definitions: Once we're confident about the features we've defined, we can now submit the ones we want TurboML to compute continuously for the actual data stream.
Fetch data
We can use the get_features function to get a snapshot or subset of the data stream.
Note: This size of the dataset returned by this function can change on each invocation. Also, the dataset is not guaranteed to be in the same order.
df_transactions = tb.get_features(dataset_id="transactions")
df_transactions
Add feature definitions
To add feature definitions, we have a class from turboml package called FeatureEngineering. This allows us to define SQL-based and dynamic aggregation-based features.
The following cell shows how to define an SQL-based feature. The sql_definition parameter in the create_sql_features function takes in the SQL expression to be used to prepare the feature. It returns a dataframe with all the original columns, and another column which, on a high-level is defined as SELECT sql_definition AS new_feature_name FROM dataframe
.
transactions.feature_engineering.create_sql_features(
sql_definition='"transactionAmount" + "localHour"',
new_feature_name="my_sql_feat",
)
transactions.feature_engineering.get_local_features()
tb.get_timestamp_formats()
transactions.feature_engineering.register_timestamp(
column_name="timestamp", format_type="epoch_seconds"
)
The following cell shows how to define an aggregation-based feature using the create_aggregate_features function. It returns a dataframe with all the original columns, and another column which, on a high-level is defined as SELECT operation(column_to_operate) OVER (PARTITION BY column_to_group ORDER BY time_column RANGE BETWEEN INTERVAL window_duration PRECEDING AND CURRENT ROW) as new_feature_name from dataframe
.
transactions.feature_engineering.create_aggregate_features(
column_to_operate="transactionAmount",
column_to_group="accountID",
operation="SUM",
new_feature_name="my_sum_feat",
timestamp_column="timestamp",
window_duration=24,
window_unit="hours",
)
transactions.feature_engineering.get_local_features()
Submit feature definitions
Now that we've seen the newly created features, and everything looks good, we can submit these feature definitions to the TurboML platform so that this can be computed continously for the input data stream.
We need to tell the platform to start computations for all pending features for the given dataset. This can be done by calling the materialize_features function.
transactions.feature_engineering.materialize_features(["my_sql_feat", "my_sum_feat"])
df_transactions = transactions.feature_engineering.get_materialized_features()
df_transactions
Machine Learning Modelling
TurboML provides out of the box algorithms, optimized for real-time ML, and supports bringing your own models and algorithms as well. In this tutorial, we'll use the algorithms provided by TurboML.
Check the available algorithms
You can check what are the available ML algorithms based on tb.ml_algorithms(have_labels=True/False)
depending on supervised or unsupervised learning.
tb.ml_algorithms(have_labels=False)
Let's use the RandomCutForest (RCF) algorithm.
Create model
Now that we've chosen an algorithm, we need to create a model.
model = tb.RCF(number_of_trees=50)
Run Streaming ML jobs
Now that we've instantiated the model, we can deploy it using the deploy function. For an unsupervised ML job, we need to provide a dataset from which the model can consume inputs. For each record in this dataset, the model will make a prediction, produce the prediction to an output dataset, and then perform unsupervised updates using this record.
There are four types of fields that can be used by any ML algorithm:
- numerical_fields: This represents fields that we want our algorithm to treat as real-valued fields.
- categorical_fields: This represents fields that we want our algorithm to treat as categorical fields.
- time_field: This is used for time-series applications to capture the timestamp field.
- textual_fields: This represents fields that we want our algorithm to treat as text fields.
The input values from any of these fields are suitably converted to the desired type. String values are converted using the hashing trick.
Let's construct a model config using the following numerical fields, no categorical or time fields.
numerical_fields = [
"transactionAmount",
"localHour",
"my_sum_feat",
"my_sql_feat",
]
features = transactions.get_input_fields(numerical_fields=numerical_fields)
label = labels.get_label_field(label_field="is_fraud")
deployed_model_rcf = model.deploy(name="demo_model_rcf", input=features, labels=label)
Inspect model outputs
We can now fetch the outputs that the model produced by calling the get_outputs function.
Note: This size of the outputs returned by this function can change on each invocation, since the model is continuosly producing outputs.
outputs = deployed_model_rcf.get_outputs()
len(outputs)
sample_output = outputs[-1]
sample_output
The above output corresponds to an input with the key, or index, sample_output.key. Along with the anomaly score, the output also contains attributions to different features. We can see that the first numerical feature, i.e. 'transactionAmount' is around sample_output.feature_score[0]*100% responsible for the anomaly score
import matplotlib.pyplot as plt
plt.plot([output["record"].score for output in outputs])
Model Endpoints
The above method of interacting with the model was asynchronous. We were adding our datapoints to an input dataset, and getting the corresponding model outputs in an output dataset. In some scenarios, we need a synchronous method to query the model. This is where we can use the model endpoints that TurboML exposes.
model_endpoints = deployed_model_rcf.get_endpoints()
model_endpoints
Now that we know what endpoint to send the request to, we now need to figure out the right format. Let's try to make a prediction on the last row from our input dataset.
model_query_datapoint = transactions_df.iloc[-1].to_dict()
model_query_datapoint
import requests
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
Batch Inference on Models
While the above method is more suited for individual requests, we can also perform batch inference on the models. We use the get_inference function for this purpose.
outputs = deployed_model_rcf.get_inference(transactions_df)
outputs
Model Evaluation
Similar to ML models, TurboML provides in-built metrics, and supports defining your own metrics. Let's see the available metrics.
tb.evaluation_metrics()
We can select the AreaUnderCurve (AUC) metric to evaluate our anomaly detection model. The windowed prefix means we're evaluating these metrics over a rolling window. By default, the window size is 1000
.
deployed_model_rcf.add_metric("WindowedAUC")
Similar to steps like feature engineering and ML modelling, model evaluation is also a continuosly running job. We can look at the snapshot of the model metrics at any given instance by using the get_evaluation function.
Note: This size of the outputs returned by this function can change on each invocation, since we're continuously evaluating the model.
model_auc_scores = deployed_model_rcf.get_evaluation("WindowedAUC")
model_auc_scores[-1]
import matplotlib.pyplot as plt
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
Supervised Learning
Let's now take an example with a supervised learning algorithm. First, let's see what algorithms are supported out of the box.
tb.ml_algorithms(have_labels=True)
We can use HoeffdingTreeClassifier to try to classify fraudulent and normal activity on the same dataset. First, we need to instantiate a model.
htc_model = tb.HoeffdingTreeClassifier(n_classes=2)
We can use the same numerical fields in this model as well. However, let's add some categorical fields as well.
categorical_fields = [
"digitalItemCount",
"physicalItemCount",
"isProxyIP",
]
features = transactions.get_input_fields(
numerical_fields=numerical_fields, categorical_fields=categorical_fields
)
label = labels.get_label_field(label_field="is_fraud")
Run Supervised ML jobs
Same as before, we can deploy this model with the deploy function.
deployed_model_htc = htc_model.deploy("demo_classifier", input=features, labels=label)
We can now inspect the outputs.
outputs = deployed_model_htc.get_outputs()
len(outputs)
sample_output = outputs[-1]
sample_output
We notice that since this is a classification model, we have some new attributes in the output, specifically class_probabilities
and predicted_class
. We also have the score
attribute which, for classification, just shows us the probability for the last class.
Supervised Model Endpoints
Predict API for supervised models is exactly the same as unsupervised models.
model_endpoints = deployed_model_htc.get_endpoints()
model_endpoints
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
Supervised Model Evaluation
Let's now evaluate our supervised ML model. The process is exactly the same as for unsupervised model evaluation.
deployed_model_htc.add_metric("WindowedAUC")
We can use the same get_evaluation function to fetch the metrics for this model as well. Remember, this function retrieves the metric values present at that moment of time. So, if the number of records recieved seem low, just re-run this function.
model_auc_scores = deployed_model_htc.get_evaluation("WindowedAUC")
model_auc_scores[-1]
plt.plot([model_auc_score.metric for model_auc_score in model_auc_scores])
Model Comparison
Now that we have 2 models deployed, and we've registered metrics for both of them, we can compare them on real-time data. On each invocation, the following function will fetch the latest evaluations of the models and plot them.
tb.compare_model_metrics(
models=[deployed_model_rcf, deployed_model_htc], metric="WindowedAUC"
)
Model Deletion
We can delete the models like this, by default the generated output is deleted. If you want to retain the output generated by model, use delete_output_topic=False
.
deployed_model_rcf.delete()