Feature Engineering
UDAF

Feature Engineering - Python UDAF

Open In Colab (opens in a new tab)

import pandas as pd
import turboml as tb
transactions = pd.read_csv("data/transactions.csv")[0:100]
try:
    transactions = tb.PandasDataset(
        dataset_name="transactions_udaf",
        key_field="index",
        dataframe=transactions.reset_index(),
        upload=True,
    )
except:
    transactions = tb.PandasDataset(dataset_name="transactions_udaf")

User Defined Aggregation function

To create a UDAF, you need to implement the following essential functions in a separate python file containing the function. These functions manage the lifecycle of the aggregation process, from initialization to final result computation:

State Initialization (create_state):

Purpose: This function sets up the initial state for the UDAF. Requirement: The state should represent the data structure that will store intermediate results (e.g., sum, count, or any other aggregated values).

Accumulation (accumulate):

Purpose: This function updates the state with new values as they are processed. Requirement: It should handle null or missing values gracefully and update the intermediate state based on the value and any additional parameters.

Retraction (retract):

Purpose: This function "retracts" or removes a previously accumulated value from the state. Requirement: It should reverse the effect of the accumulate function for cases where data needs to be removed (e.g., when undoing a previous operation).

Merging States (merge_states):

Purpose: This function merges two states together. Requirement: Combine the intermediate results from two states into one. This is essential for distributed aggregations.

Final Result Computation (finish):

Purpose: This function computes the final result once all values have been accumulated. Requirement: It should return the final output of the aggregation based on the state. Handle edge cases such as empty datasets (e.g., return None if no valid values were processed).

function_file_contents = open("udaf.py").read()
print(function_file_contents)
transactions.feature_engineering.register_timestamp(
    column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_udaf_features(
    new_feature_name="weighted_avg",
    column_to_operate=["transactionAmount", "transactionTime"],
    function_name="weighted_avg",
    return_type="DOUBLE",
    function_file_contents=function_file_contents,
    column_to_group=["accountID"],
    timestamp_column="timestamp",
    window_duration=1,
    window_unit="hours",
)
transactions.feature_engineering.materialize_features(["weighted_avg"])