Feature Engineering
UDAF

Feature Engineering - Python UDAF

Open In Colab (opens in a new tab)

    import turboml as tb
    import pandas as pd
    transactions = pd.read_csv("data/transactions.csv")[0:100]
    transactions = tb.PandasDataset(
        dataset_name="transactions_udaf",
        key_field="index",
        dataframe=transactions.reset_index(),
        upload=True,
    )

User Defined Aggregation function

To create a UDAF, you need to implement the following essential functions in a separate python file containing the function. These functions manage the lifecycle of the aggregation process, from initialization to final result computation:

State Initialization (create_state):

Purpose: This function sets up the initial state for the UDAF. Requirement: The state should represent the data structure that will store intermediate results (e.g., sum, count, or any other aggregated values).

Accumulation (accumulate):

Purpose: This function updates the state with new values as they are processed. Requirement: It should handle null or missing values gracefully and update the intermediate state based on the value and any additional parameters.

Retraction (retract):

Purpose: This function "retracts" or removes a previously accumulated value from the state. Requirement: It should reverse the effect of the accumulate function for cases where data needs to be removed (e.g., when undoing a previous operation).

Merging States (merge_states):

Purpose: This function merges two states together. Requirement: Combine the intermediate results from two states into one. This is essential for distributed aggregations.

Final Result Computation (finish):

Purpose: This function computes the final result once all values have been accumulated. Requirement: It should return the final output of the aggregation based on the state. Handle edge cases such as empty datasets (e.g., return None if no valid values were processed).

    function_file_contents = open("udaf.py").read()
    print(function_file_contents)
    transactions.feature_engineering.register_timestamp(
        column_name="timestamp", format_type="epoch_seconds"
    )
    transactions.feature_engineering.create_udaf_features(
        new_feature_name="weighted_avg",
        column_to_operate=["transactionAmount", "transactionTime"],
        function_name="weighted_avg",
        return_type="DOUBLE",
        function_file_contents=function_file_contents,
        column_to_group=["accountID"],
        timestamp_column="timestamp",
        window_duration=1,
        window_unit="hours",
    )
    transactions.feature_engineering.materialize_features(["weighted_avg"])