Feature Engineering - Python UDAF
import turboml as tb
transactions = tb.datasets.FraudDetectionDatasetFeatures()[:100].to_online(
id="udaf_transactions", load_if_exists=True
)
User Defined Aggregation function
To create a UDAF, you need to implement the following essential functions in a separate python file containing the function. These functions manage the lifecycle of the aggregation process, from initialization to final result computation:
State Initialization (create_state):
Purpose: This function sets up the initial state for the UDAF. Requirement: The state should represent the data structure that will store intermediate results (e.g., sum, count, or any other aggregated values).
Accumulation (accumulate):
Purpose: This function updates the state with new values as they are processed. Requirement: It should handle null or missing values gracefully and update the intermediate state based on the value and any additional parameters.
Retraction (retract):
Purpose: This function "retracts" or removes a previously accumulated value from the state. Requirement: It should reverse the effect of the accumulate function for cases where data needs to be removed (e.g., when undoing a previous operation).
Merging States (merge_states):
Purpose: This function merges two states together. Requirement: Combine the intermediate results from two states into one. This is essential for distributed aggregations.
Final Result Computation (finish):
Purpose: This function computes the final result once all values have been accumulated. Requirement: It should return the final output of the aggregation based on the state. Handle edge cases such as empty datasets (e.g., return None if no valid values were processed).
function_file_contents = """
def create_state():
return 0, 0
def accumulate(state, value, weight):
if value is None or weight is None:
return state
(s, w) = state
s += value * weight
w += weight
return s, w
def retract(state, value, weight):
if value is None or weight is None:
return state
(s, w) = state
s -= value * weight
w -= weight
return s, w
def merge_states(state_a, state_b):
(s_a, w_a) = state_a
(s_b, w_b) = state_b
return s_a + s_b, w_a + w_b
def finish(state):
(sum, weight) = state
if weight == 0:
return None
else:
return sum / weight
"""
transactions.feature_engineering.register_timestamp(
column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_udaf_features(
new_feature_name="weighted_avg",
column_to_operate=["transactionAmount", "transactionTime"],
function_name="weighted_avg",
return_type="DOUBLE",
function_file_contents=function_file_contents,
column_to_group=["accountID"],
timestamp_column="timestamp",
window_duration=1,
window_unit="hours",
)
transactions.feature_engineering.materialize_features(["weighted_avg"])
transactions.feature_engineering.get_materialized_features()