General
Local Model

Local Model

Open In Colab (opens in a new tab)

LocalModel is our Python interface that gives direct access to TurboML's machine learning models.

We will use the transactions.csv and labels.csv datasets for our experiments.

    import turboml as tb
    from turboml import LocalModel
    from turboml.common.models import InputSpec
    from turboml.common.dataloader import Inputs, Labels, PandasDataset
    import pandas as pd
    import numpy as np
    from sklearn import metrics
    import time
    import base64

Load Datasets

    # Load datasets
    transactions_df = pd.read_csv("data/transactions.csv").reset_index()
    labels_df = pd.read_csv("data/labels.csv").reset_index()
    
    # Use the first 100,000 records for training
    transactions_100k = PandasDataset(
        dataframe=transactions_df[:100000], key_field="index", streaming=False
    )
    labels_100k = PandasDataset(
        dataframe=labels_df[:100000], key_field="index", streaming=False
    )

Define Input Specification

    numerical_fields = [
        "transactionAmount",
        "localHour",
    ]
    
    categorical_fields = [
        "digitalItemCount",
        "physicalItemCount",
        "isProxyIP",
    ]
    
    input_spec = InputSpec(
        key_field="index",
        numerical_fields=numerical_fields,
        categorical_fields=categorical_fields,
        textual_fields=[],
        imaginal_fields=[],
        time_field="",
        label_field="is_fraud",
    )

Prepare Input and Label Data

    input_data = Inputs(
        dataset_id="transactions_topic",
        dataframe=transactions_df[:100000],
        key_field="index",
        numerical_fields=numerical_fields,
        categorical_fields=categorical_fields,
    )
    
    label_data = Labels(
        dataset_id="labels_topic",
        dataframe=labels_df[:100000],
        key_field="index",
        label_field="is_fraud",
    )
    
    test_input_data = Inputs(
        dataset_id="test_transactions_topic",
        dataframe=transactions_df[100000:120000],
        key_field="index",
        numerical_fields=numerical_fields,
        categorical_fields=categorical_fields,
    )
    
    test_label_data = Labels(
        dataset_id="test_labels_topic",
        dataframe=labels_df[100000:120000],
        key_field="index",
        label_field="is_fraud",
    )

Define Model Configurations

    model_configs = {
        "HoeffdingTree": [
            {
                "algorithm": "HoeffdingTreeClassifier",
                "hoeffding_classifier_config": {
                    "delta": 1e-7,
                    "tau": 0.05,
                    "grace_period": 200,
                    "n_classes": 2,
                    "leaf_pred_method": "mc",
                    "split_method": "gini",
                },
            }
        ],
        "AMF": [
            {
                "algorithm": "AMFClassifier",
                "amf_classifier_config": {
                    "n_classes": 2,
                    "n_estimators": 10,
                    "step": 1,
                    "use_aggregation": True,
                    "dirichlet": 0.5,
                    "split_pure": False,
                },
            }
        ],
        "MultinomialNB": [
            {
                "algorithm": "MultinomialNB",
                "multinomial_config": {"n_classes": 2, "alpha": 1.0},
            }
        ],
    }

Training and Evaluation Function

    def train_and_evaluate(name, config):
        try:
            # Create LocalModel
            model = LocalModel(
                model_configs=config,
                input_spec=input_spec,
            )
    
            print(f"Training {name} model on first 100K records...")
            model.learn(input_data, label_data)
    
            # Make predictions on test data
            predictions = model.predict(test_input_data)
    
            # Evaluate model performance
            roc_auc = metrics.roc_auc_score(
                test_label_data.dataframe["is_fraud"], predictions["score"]
            )
            accuracy = metrics.accuracy_score(
                test_label_data.dataframe["is_fraud"], predictions["predicted_class"]
            )
    
            print(f"{name} Model Results:")
            print(f"ROC AUC Score: {roc_auc:.4f}")
            print(f"Accuracy Score: {accuracy:.4f}")
            return model, predictions
    
        except Exception as e:
            print(f"Error with {name} model: {str(e)}")
            return None, None
    model_trained_100K = {}
    initial_results = {}
    
    for name, config in model_configs.items():
        model, predictions = train_and_evaluate(name, config)
        if model is not None:
            model_trained_100K[name] = model
            initial_results[name] = predictions

Further Training in Batches

We will continue training the Hoeffding Tree model with additional data in batches.

    # Get the trained Hoeffding Tree model
    model_hoeffding_tree = model_trained_100K.get("HoeffdingTree")
    
    if model_hoeffding_tree is not None:
        # Split the dataset into 10 parts for batch training
        data_parts = np.array_split(transactions_df[100000:], 10)
        label_parts = np.array_split(labels_df[100000:], 10)
    
        for i, (data_part, label_part) in enumerate(
            zip(data_parts, label_parts, strict=False)
        ):
            print(f"\nPreparing batch {i + 1}...")
            df_train_tb = tb.PandasDataset(
                dataframe=data_part, key_field="index", streaming=False
            )
            df_y_train_tb = tb.PandasDataset(
                dataframe=label_part, key_field="index", streaming=False
            )
    
            features = df_train_tb.get_input_fields(
                numerical_fields=numerical_fields,
                categorical_fields=categorical_fields,
            )
            labels = df_y_train_tb.get_label_field(label_field="is_fraud")
    
            print(f"Training batch {i + 1}...")
            start_time = time.time()
            model_hoeffding_tree.learn(features, labels)
            end_time = time.time()
            print(
                f"Batch {i + 1} training completed in {end_time - start_time:.2f} seconds."
            )
    else:
        print("Hoeffding Tree model not found in trained models.")

ONNX Model

    !pip install onnx==1.14.1 scikit-learn skl2onnx
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from skl2onnx import convert_sklearn
    from skl2onnx.common.data_types import FloatTensorType
    
    # Prepare features and target
    X = transactions_df[numerical_fields + categorical_fields]
    y = labels_df["is_fraud"]
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Train sklearn model
    clf = RandomForestClassifier()
    clf.fit(X_train, y_train)
    # Convert to ONNX format
    initial_type = [("float_input", FloatTensorType([None, X_train.shape[1]]))]
    onx = convert_sklearn(
        clf, initial_types=initial_type, options={type(clf): {"zipmap": False}}
    )
    
    # Get the serialized ONNX model
    onnx_model_data = onx.SerializeToString()
    # Base64-encode the ONNX model data
    model_data_base64 = base64.b64encode(onnx_model_data).decode("utf-8")
    # Create ONNX model config with the encoded model data
    onnx_model_config = [
        {
            "algorithm": "ONNX",
            "onnx_config": {
                "model_save_name": "randomforest",
                "model_data": model_data_base64,
            },
        }
    ]
    
    
    onnx_input_spec = InputSpec(
        key_field="index",
        numerical_fields=numerical_fields + categorical_fields,
        categorical_fields=[],
        textual_fields=[],
        imaginal_fields=[],
        time_field="",
        label_field="is_fraud",
    )
    
    local_onnx_model = LocalModel(
        model_configs=onnx_model_config,
        input_spec=onnx_input_spec,
    )
    # Create test input data
    test_input_data = Inputs(
        dataset_id="test_transactions_topic",
        dataframe=X_test.reset_index(),
        key_field="index",
        numerical_fields=numerical_fields + categorical_fields,
    )
    
    test_label_data = Labels(
        dataset_id="test_labels_topic",
        dataframe=pd.DataFrame({"index": X_test.index, "is_fraud": y_test}).reset_index(
            drop=True
        ),
        key_field="index",
        label_field="is_fraud",
    )
    def onnx_model():
        try:
            # Get predictions
            predictions = local_onnx_model.predict(test_input_data)
    
            # Calculate metrics
            roc_auc = metrics.roc_auc_score(
                test_label_data.dataframe["is_fraud"],
                predictions["score"],
            )
            accuracy = metrics.accuracy_score(
                test_label_data.dataframe["is_fraud"],
                predictions["predicted_class"],
            )
    
            print("ONNX Model Results:")
            print(f"ROC AUC Score: {roc_auc:.4f}")
            print(f"Accuracy Score: {accuracy:.4f}")
    
            return predictions
    
        except Exception as e:
            print(f"Error testing ONNX model: {str(e)}")
            return None
    
    
    # Run the test
    predictions = onnx_model()
    
    if predictions is not None:
        sklearn_preds = clf.predict(X_test)
        onnx_preds = predictions["predicted_class"]
    
        match_rate = (sklearn_preds == onnx_preds).mean()
        print("\nPrediction Comparison:")
        print(f"Sklearn vs ONNX prediction match rate: {match_rate:.4f}")

Python Model Testing

    python_model_code = """
    from river import linear_model
    import turboml.common.pytypes as types
    
    class MyLogisticRegression:
    
        def init_imports(self):
            from river import linear_model
            import turboml.common.pytypes as types
    
        def __init__(self):
            self.model = linear_model.LogisticRegression()
    
        def learn_one(self, input):
            # Combine numerical and categorical features into a dictionary
            features = {}
            features.update({f'num_{i}': val for i, val in enumerate(input.numeric)})
            features.update({f'cat_{i}': val for i, val in enumerate(input.categ)})
            self.model.learn_one(features, input.label)
    
        def predict_one(self, input, output):
            # Combine numerical and categorical features into a dictionary
            features = {}
            features.update({f'num_{i}': val for i, val in enumerate(input.numeric)})
            features.update({f'cat_{i}': val for i, val in enumerate(input.categ)})
            proba = self.model.predict_proba_one(features)
            score = float(proba.get(True, 0))
            output.set_score(score)
            output.set_predicted_class(int(score >= 0.5))
    """
    # Define the model configuration
    python_model_config = {
        "algorithm": "Python",
        "python_config": {
            "class_name": "MyLogisticRegression",
            "code": python_model_code,
        },
    }
    
    # Create the LocalModel instance
    local_python_model = LocalModel(
        model_configs=[python_model_config],
        input_spec=input_spec,
    )
    # Train the model
    local_python_model.learn(input_data, label_data)
    
    # Make predictions
    predictions = local_python_model.predict(test_input_data)
    
    # Evaluate the model
    roc_auc = metrics.roc_auc_score(
        test_label_data.dataframe["is_fraud"], predictions["score"]
    )
    accuracy = metrics.accuracy_score(
        test_label_data.dataframe["is_fraud"], predictions["predicted_class"]
    )
    
    print(f"Python Model ROC AUC Score: {roc_auc:.4f}")
    print(f"Python Model Accuracy Score: {accuracy:.4f}")

Python Ensemble Model

    # Base models (already defined and trained)
    hoeffding_tree_model = model_trained_100K["HoeffdingTree"]
    amf_classifier_model = model_trained_100K["AMF"]
    multinomial_nb_model = model_trained_100K["MultinomialNB"]
    
    # Extract base model configurations
    base_model_configs = [
        hoeffding_tree_model.model_configs[0],
        amf_classifier_model.model_configs[0],
        multinomial_nb_model.model_configs[0],
    ]
    # Prepare ensemble model code
    ensemble_model_code = """
    import turboml.common.pymodel as model
    from typing import List
    
    class MyEnsembleModel:
        def __init__(self, base_models: List[model.Model]):
            if not base_models:
                raise ValueError("PythonEnsembleModel requires at least one base model.")
            self.base_models = base_models
    
        def init_imports(self):
            import turboml.common.pytypes as types
            from typing import List
    
        def learn_one(self, input):
            for model in self.base_models:
                model.learn_one(input)
    
        def predict_one(self, input, output):
            total_score = 0.0
            for model in self.base_models:
                model_output = model.predict_one(input)
                total_score += model_output.score()
            average_score = total_score / len(self.base_models)
            output.set_score(average_score)
            output.set_predicted_class(int(average_score >= 0.5))
    """
    # Define the ensemble model configuration
    ensemble_model_config = {
        "algorithm": "PythonEnsembleModel",
        "python_ensemble_config": {
            "class_name": "MyEnsembleModel",
            "code": ensemble_model_code,
        },
    }
    
    # Combine the ensemble model config and base model configs
    model_configs = [ensemble_model_config] + base_model_configs
    
    # Create the ensemble LocalModel instance
    ensemble_model = tb.LocalModel(
        model_configs=model_configs,
        input_spec=input_spec,
    )
    # Train the ensemble model
    ensemble_model.learn(input_data, label_data)
    
    # Make predictions with the ensemble model
    ensemble_predictions = ensemble_model.predict(test_input_data)
    
    # Evaluate the ensemble model
    roc_auc = metrics.roc_auc_score(
        test_label_data.dataframe["is_fraud"], ensemble_predictions["score"]
    )
    accuracy = metrics.accuracy_score(
        test_label_data.dataframe["is_fraud"], ensemble_predictions["predicted_class"]
    )
    
    print(f"Ensemble Model ROC AUC Score: {roc_auc:.4f}")
    print(f"Ensemble Model Accuracy Score: {accuracy:.4f}")