Local Model
LocalModel is our Python interface that gives direct access to TurboML's machine learning models.
We will use the transactions.csv and labels.csv datasets for our experiments.
import turboml as tb
from turboml import LocalModel
from turboml.common.models import InputSpec
from turboml.common.dataloader import Inputs, Labels, PandasDataset
import pandas as pd
import numpy as np
from sklearn import metrics
import time
import base64
Load Datasets
# Load datasets
transactions_df = pd.read_csv("data/transactions.csv").reset_index()
labels_df = pd.read_csv("data/labels.csv").reset_index()
# Use the first 100,000 records for training
transactions_100k = PandasDataset(
dataframe=transactions_df[:100000], key_field="index", streaming=False
)
labels_100k = PandasDataset(
dataframe=labels_df[:100000], key_field="index", streaming=False
)
Define Input Specification
numerical_fields = [
"transactionAmount",
"localHour",
]
categorical_fields = [
"digitalItemCount",
"physicalItemCount",
"isProxyIP",
]
input_spec = InputSpec(
key_field="index",
numerical_fields=numerical_fields,
categorical_fields=categorical_fields,
textual_fields=[],
imaginal_fields=[],
time_field="",
label_field="is_fraud",
)
Prepare Input and Label Data
input_data = Inputs(
dataset_id="transactions_topic",
dataframe=transactions_df[:100000],
key_field="index",
numerical_fields=numerical_fields,
categorical_fields=categorical_fields,
)
label_data = Labels(
dataset_id="labels_topic",
dataframe=labels_df[:100000],
key_field="index",
label_field="is_fraud",
)
test_input_data = Inputs(
dataset_id="test_transactions_topic",
dataframe=transactions_df[100000:120000],
key_field="index",
numerical_fields=numerical_fields,
categorical_fields=categorical_fields,
)
test_label_data = Labels(
dataset_id="test_labels_topic",
dataframe=labels_df[100000:120000],
key_field="index",
label_field="is_fraud",
)
Define Model Configurations
model_configs = {
"HoeffdingTree": [
{
"algorithm": "HoeffdingTreeClassifier",
"hoeffding_classifier_config": {
"delta": 1e-7,
"tau": 0.05,
"grace_period": 200,
"n_classes": 2,
"leaf_pred_method": "mc",
"split_method": "gini",
},
}
],
"AMF": [
{
"algorithm": "AMFClassifier",
"amf_classifier_config": {
"n_classes": 2,
"n_estimators": 10,
"step": 1,
"use_aggregation": True,
"dirichlet": 0.5,
"split_pure": False,
},
}
],
"MultinomialNB": [
{
"algorithm": "MultinomialNB",
"multinomial_config": {"n_classes": 2, "alpha": 1.0},
}
],
}
Training and Evaluation Function
def train_and_evaluate(name, config):
try:
# Create LocalModel
model = LocalModel(
model_configs=config,
input_spec=input_spec,
)
print(f"Training {name} model on first 100K records...")
model.learn(input_data, label_data)
# Make predictions on test data
predictions = model.predict(test_input_data)
# Evaluate model performance
roc_auc = metrics.roc_auc_score(
test_label_data.dataframe["is_fraud"], predictions["score"]
)
accuracy = metrics.accuracy_score(
test_label_data.dataframe["is_fraud"], predictions["predicted_class"]
)
print(f"{name} Model Results:")
print(f"ROC AUC Score: {roc_auc:.4f}")
print(f"Accuracy Score: {accuracy:.4f}")
return model, predictions
except Exception as e:
print(f"Error with {name} model: {str(e)}")
return None, None
model_trained_100K = {}
initial_results = {}
for name, config in model_configs.items():
model, predictions = train_and_evaluate(name, config)
if model is not None:
model_trained_100K[name] = model
initial_results[name] = predictions
Further Training in Batches
We will continue training the Hoeffding Tree model with additional data in batches.
# Get the trained Hoeffding Tree model
model_hoeffding_tree = model_trained_100K.get("HoeffdingTree")
if model_hoeffding_tree is not None:
# Split the dataset into 10 parts for batch training
data_parts = np.array_split(transactions_df[100000:], 10)
label_parts = np.array_split(labels_df[100000:], 10)
for i, (data_part, label_part) in enumerate(
zip(data_parts, label_parts, strict=False)
):
print(f"\nPreparing batch {i + 1}...")
df_train_tb = tb.PandasDataset(
dataframe=data_part, key_field="index", streaming=False
)
df_y_train_tb = tb.PandasDataset(
dataframe=label_part, key_field="index", streaming=False
)
features = df_train_tb.get_input_fields(
numerical_fields=numerical_fields,
categorical_fields=categorical_fields,
)
labels = df_y_train_tb.get_label_field(label_field="is_fraud")
print(f"Training batch {i + 1}...")
start_time = time.time()
model_hoeffding_tree.learn(features, labels)
end_time = time.time()
print(
f"Batch {i + 1} training completed in {end_time - start_time:.2f} seconds."
)
else:
print("Hoeffding Tree model not found in trained models.")
ONNX Model
!pip install onnx==1.14.1 scikit-learn skl2onnx
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
# Prepare features and target
X = transactions_df[numerical_fields + categorical_fields]
y = labels_df["is_fraud"]
# Split the data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train sklearn model
clf = RandomForestClassifier()
clf.fit(X_train, y_train)
# Convert to ONNX format
initial_type = [("float_input", FloatTensorType([None, X_train.shape[1]]))]
onx = convert_sklearn(
clf, initial_types=initial_type, options={type(clf): {"zipmap": False}}
)
# Get the serialized ONNX model
onnx_model_data = onx.SerializeToString()
# Base64-encode the ONNX model data
model_data_base64 = base64.b64encode(onnx_model_data).decode("utf-8")
# Create ONNX model config with the encoded model data
onnx_model_config = [
{
"algorithm": "ONNX",
"onnx_config": {
"model_save_name": "randomforest",
"model_data": model_data_base64,
},
}
]
onnx_input_spec = InputSpec(
key_field="index",
numerical_fields=numerical_fields + categorical_fields,
categorical_fields=[],
textual_fields=[],
imaginal_fields=[],
time_field="",
label_field="is_fraud",
)
local_onnx_model = LocalModel(
model_configs=onnx_model_config,
input_spec=onnx_input_spec,
)
# Create test input data
test_input_data = Inputs(
dataset_id="test_transactions_topic",
dataframe=X_test.reset_index(),
key_field="index",
numerical_fields=numerical_fields + categorical_fields,
)
test_label_data = Labels(
dataset_id="test_labels_topic",
dataframe=pd.DataFrame({"index": X_test.index, "is_fraud": y_test}).reset_index(
drop=True
),
key_field="index",
label_field="is_fraud",
)
def onnx_model():
try:
# Get predictions
predictions = local_onnx_model.predict(test_input_data)
# Calculate metrics
roc_auc = metrics.roc_auc_score(
test_label_data.dataframe["is_fraud"],
predictions["score"],
)
accuracy = metrics.accuracy_score(
test_label_data.dataframe["is_fraud"],
predictions["predicted_class"],
)
print("ONNX Model Results:")
print(f"ROC AUC Score: {roc_auc:.4f}")
print(f"Accuracy Score: {accuracy:.4f}")
return predictions
except Exception as e:
print(f"Error testing ONNX model: {str(e)}")
return None
# Run the test
predictions = onnx_model()
if predictions is not None:
sklearn_preds = clf.predict(X_test)
onnx_preds = predictions["predicted_class"]
match_rate = (sklearn_preds == onnx_preds).mean()
print("\nPrediction Comparison:")
print(f"Sklearn vs ONNX prediction match rate: {match_rate:.4f}")
Python Model Testing
python_model_code = """
from river import linear_model
import turboml.common.pytypes as types
class MyLogisticRegression:
def init_imports(self):
from river import linear_model
import turboml.common.pytypes as types
def __init__(self):
self.model = linear_model.LogisticRegression()
def learn_one(self, input):
# Combine numerical and categorical features into a dictionary
features = {}
features.update({f'num_{i}': val for i, val in enumerate(input.numeric)})
features.update({f'cat_{i}': val for i, val in enumerate(input.categ)})
self.model.learn_one(features, input.label)
def predict_one(self, input, output):
# Combine numerical and categorical features into a dictionary
features = {}
features.update({f'num_{i}': val for i, val in enumerate(input.numeric)})
features.update({f'cat_{i}': val for i, val in enumerate(input.categ)})
proba = self.model.predict_proba_one(features)
score = float(proba.get(True, 0))
output.set_score(score)
output.set_predicted_class(int(score >= 0.5))
"""
# Define the model configuration
python_model_config = {
"algorithm": "Python",
"python_config": {
"class_name": "MyLogisticRegression",
"code": python_model_code,
},
}
# Create the LocalModel instance
local_python_model = LocalModel(
model_configs=[python_model_config],
input_spec=input_spec,
)
# Train the model
local_python_model.learn(input_data, label_data)
# Make predictions
predictions = local_python_model.predict(test_input_data)
# Evaluate the model
roc_auc = metrics.roc_auc_score(
test_label_data.dataframe["is_fraud"], predictions["score"]
)
accuracy = metrics.accuracy_score(
test_label_data.dataframe["is_fraud"], predictions["predicted_class"]
)
print(f"Python Model ROC AUC Score: {roc_auc:.4f}")
print(f"Python Model Accuracy Score: {accuracy:.4f}")
Python Ensemble Model
# Base models (already defined and trained)
hoeffding_tree_model = model_trained_100K["HoeffdingTree"]
amf_classifier_model = model_trained_100K["AMF"]
multinomial_nb_model = model_trained_100K["MultinomialNB"]
# Extract base model configurations
base_model_configs = [
hoeffding_tree_model.model_configs[0],
amf_classifier_model.model_configs[0],
multinomial_nb_model.model_configs[0],
]
# Prepare ensemble model code
ensemble_model_code = """
import turboml.common.pymodel as model
from typing import List
class MyEnsembleModel:
def __init__(self, base_models: List[model.Model]):
if not base_models:
raise ValueError("PythonEnsembleModel requires at least one base model.")
self.base_models = base_models
def init_imports(self):
import turboml.common.pytypes as types
from typing import List
def learn_one(self, input):
for model in self.base_models:
model.learn_one(input)
def predict_one(self, input, output):
total_score = 0.0
for model in self.base_models:
model_output = model.predict_one(input)
total_score += model_output.score()
average_score = total_score / len(self.base_models)
output.set_score(average_score)
output.set_predicted_class(int(average_score >= 0.5))
"""
# Define the ensemble model configuration
ensemble_model_config = {
"algorithm": "PythonEnsembleModel",
"python_ensemble_config": {
"class_name": "MyEnsembleModel",
"code": ensemble_model_code,
},
}
# Combine the ensemble model config and base model configs
model_configs = [ensemble_model_config] + base_model_configs
# Create the ensemble LocalModel instance
ensemble_model = tb.LocalModel(
model_configs=model_configs,
input_spec=input_spec,
)
# Train the ensemble model
ensemble_model.learn(input_data, label_data)
# Make predictions with the ensemble model
ensemble_predictions = ensemble_model.predict(test_input_data)
# Evaluate the ensemble model
roc_auc = metrics.roc_auc_score(
test_label_data.dataframe["is_fraud"], ensemble_predictions["score"]
)
accuracy = metrics.accuracy_score(
test_label_data.dataframe["is_fraud"], ensemble_predictions["predicted_class"]
)
print(f"Ensemble Model ROC AUC Score: {roc_auc:.4f}")
print(f"Ensemble Model Accuracy Score: {accuracy:.4f}")