Feature Engineering
UDAF

Feature Engineering - Python UDAF

Open In Colab (opens in a new tab)

import turboml as tb
transactions = tb.datasets.FraudDetectionDatasetFeatures()[:100].to_online(
    id="udaf_transactions", load_if_exists=True
)

User Defined Aggregation function

To create a UDAF, you need to implement the following essential functions in a separate python file containing the function. These functions manage the lifecycle of the aggregation process, from initialization to final result computation:

State Initialization (create_state):

Purpose: This function sets up the initial state for the UDAF. Requirement: The state should represent the data structure that will store intermediate results (e.g., sum, count, or any other aggregated values).

Accumulation (accumulate):

Purpose: This function updates the state with new values as they are processed. Requirement: It should handle null or missing values gracefully and update the intermediate state based on the value and any additional parameters.

Retraction (retract):

Purpose: This function "retracts" or removes a previously accumulated value from the state. Requirement: It should reverse the effect of the accumulate function for cases where data needs to be removed (e.g., when undoing a previous operation).

Merging States (merge_states):

Purpose: This function merges two states together. Requirement: Combine the intermediate results from two states into one. This is essential for distributed aggregations.

Final Result Computation (finish):

Purpose: This function computes the final result once all values have been accumulated. Requirement: It should return the final output of the aggregation based on the state. Handle edge cases such as empty datasets (e.g., return None if no valid values were processed).

function_file_contents = """
def create_state():
    return 0, 0
 
 
def accumulate(state, value, weight):
    if value is None or weight is None:
        return state
    (s, w) = state
    s += value * weight
    w += weight
    return s, w
 
 
def retract(state, value, weight):
    if value is None or weight is None:
        return state
    (s, w) = state
    s -= value * weight
    w -= weight
    return s, w
 
 
def merge_states(state_a, state_b):
    (s_a, w_a) = state_a
    (s_b, w_b) = state_b
    return s_a + s_b, w_a + w_b
 
 
def finish(state):
    (sum, weight) = state
    if weight == 0:
        return None
    else:
        return sum / weight
"""
transactions.feature_engineering.register_timestamp(
    column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_udaf_features(
    new_feature_name="weighted_avg",
    column_to_operate=["transactionAmount", "transactionTime"],
    function_name="weighted_avg",
    return_type="DOUBLE",
    function_file_contents=function_file_contents,
    column_to_group=["accountID"],
    timestamp_column="timestamp",
    window_duration=1,
    window_unit="hours",
)
transactions.feature_engineering.materialize_features(["weighted_avg"])
transactions.feature_engineering.get_materialized_features()