Feature Engineering - Python UDAF
import turboml as tb
import pandas as pd
transactions = pd.read_csv("data/transactions.csv")[0:100]
transactions = tb.PandasDataset(
dataset_name="transactions_udaf",
key_field="index",
dataframe=transactions.reset_index(),
upload=True,
)
User Defined Aggregation function
To create a UDAF, you need to implement the following essential functions in a separate python file containing the function. These functions manage the lifecycle of the aggregation process, from initialization to final result computation:
State Initialization (create_state):
Purpose: This function sets up the initial state for the UDAF. Requirement: The state should represent the data structure that will store intermediate results (e.g., sum, count, or any other aggregated values).
Accumulation (accumulate):
Purpose: This function updates the state with new values as they are processed. Requirement: It should handle null or missing values gracefully and update the intermediate state based on the value and any additional parameters.
Retraction (retract):
Purpose: This function "retracts" or removes a previously accumulated value from the state. Requirement: It should reverse the effect of the accumulate function for cases where data needs to be removed (e.g., when undoing a previous operation).
Merging States (merge_states):
Purpose: This function merges two states together. Requirement: Combine the intermediate results from two states into one. This is essential for distributed aggregations.
Final Result Computation (finish):
Purpose: This function computes the final result once all values have been accumulated. Requirement: It should return the final output of the aggregation based on the state. Handle edge cases such as empty datasets (e.g., return None if no valid values were processed).
function_file_contents = open("udaf.py").read()
print(function_file_contents)
transactions.feature_engineering.register_timestamp(
column_name="timestamp", format_type="epoch_seconds"
)
transactions.feature_engineering.create_udaf_features(
new_feature_name="weighted_avg",
column_to_operate=["transactionAmount", "transactionTime"],
function_name="weighted_avg",
return_type="DOUBLE",
function_file_contents=function_file_contents,
column_to_group=["accountID"],
timestamp_column="timestamp",
window_duration=1,
window_unit="hours",
)
transactions.feature_engineering.materialize_features(["weighted_avg"])