Feature Engineering - Complex Stream Processing
With real-time features, there can be situtations where the feature logic cannot be expressed by simple SQL, Aggregates or Scalar Python UDFs. In such scenarios, it may be required to write custom streaming pipelines. This is where TurboML is building on Ibis (https://github.com/ibis-project/ibis/ (opens in a new tab)), to expose a DataFrame like API to support complex streaming logic for features. We currently support Apache Flink and RisingWave backends for streaming execution.
Import necessary libraries
import turboml as tb
import pandas as pd
from turboml.common.sources import (
FileSource,
DataSource,
TimestampFormatConfig,
Watermark,
DataDeliveryMode,
S3Config,
)
from turboml.common.models import BackEnd
import ibis
transactions_df = pd.read_csv("data/transactions.csv").reset_index()
transactions_df["timestamp"] = pd.to_datetime(transactions_df["timestamp"], unit="s")
labels_df = pd.read_csv("data/labels.csv").reset_index()
labels = tb.PandasDataset(
dataset_name="labels_ibis_fe", key_field="index", dataframe=labels_df, upload=True
)
Add feature definitions
To add feature definitions, we have a class from turboml package called IbisFeatureEngineering. This allows us to define features.
fe = tb.IbisFeatureEngineering()
Let's upload the data for this demo
%pip install minio
from minio import Minio
client = Minio(
"play.min.io",
access_key="Q3AM3UQ867SPQQA43P2F",
secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
secure=True,
)
bucket_name = "ibis-demo"
found = client.bucket_exists(bucket_name)
if not found:
client.make_bucket(bucket_name)
print("Created bucket", bucket_name)
else:
print("Bucket", bucket_name, "already exists")
import duckdb
con = duckdb.connect()
con.sql("SET s3_region='us-east-1';")
con.sql("SET s3_url_style='path';")
con.sql("SET s3_use_ssl=true;")
con.sql("SET s3_endpoint='play.min.io';")
con.sql("SET s3_access_key_id='Q3AM3UQ867SPQQA43P2F';")
con.sql("SET s3_secret_access_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG';")
con.sql(
"COPY (SELECT * FROM transactions_df) TO 's3://ibis-demo/transactions/transactions.parquet' (FORMAT 'parquet');"
)
DataSource
The DataSource serves as the foundational entity in the feature engineering workflow. It defines where and how the raw data is accessed for processing. After creating a DataSource, users can register their source configurations to start leveraging them in the pipeline.
Type of Delivery Modes
- Dynamic:
- Suitable for real-time or streaming data scenarios.
- Automatically creates connectors based on the source configuration.
- The Kafka topic becomes the primary input for feature engineering, ensuring seamless integration with downstream processing pipelines.
- Static:
- Designed for batch data sources.
- RisingWave/Flink reads directly from the source for feature engineering, eliminating the need for an intermediary Kafka topic.
time_col_config = TimestampFormatConfig(
format_type=TimestampFormatConfig.FormatType.EpochMillis
)
watermark = Watermark(
time_col="timestamp", allowed_delay_seconds=60, time_col_config=time_col_config
)
ds1 = DataSource(
name="transactions_stream",
key_fields=["index"],
delivery_mode=DataDeliveryMode.DYNAMIC,
file_source=FileSource(
path="transactions",
format=FileSource.Format.PARQUET,
s3_config=S3Config(
bucket="ibis-demo",
region="us-east-1",
access_key_id="Q3AM3UQ867SPQQA43P2F",
secret_access_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
endpoint="https://play.min.io",
),
),
watermark=watermark,
)
tb.register_source(ds1)
To define features we can fetch the sources and perform operations.
transactions = fe.get_ibis_table("transactions_stream")
transactions
In this example we use one kafka topic (transactions_stream) to build features using Flink.
We will also use UDF to define custom functions.
@ibis.udf.scalar.python()
def calculate_frequency_score(digital_count: float, physical_count: float) -> float:
if digital_count > 10 or physical_count > 10:
return 0.7 # High item count
elif digital_count == 0 and physical_count > 0:
return 0.3 # Physical item-only transaction
elif digital_count > 0 and physical_count == 0:
return 0.3 # Digital item-only transaction
else:
return 0.1 # Regular transaction
We can define features using ibis DSL or SQL
transactions_with_frequency_score = transactions.select(
frequency_score=calculate_frequency_score(
transactions.digitalItemCount, transactions.physicalItemCount
),
index=transactions.index,
digitalItemCount=transactions.digitalItemCount,
physicalItemCount=transactions.physicalItemCount,
transactionAmount=transactions.transactionAmount,
transactionTime=transactions.transactionTime,
isProxyIP=transactions.isProxyIP,
)
We can preview features locally
transactions_with_frequency_score.execute().head()
After satisfied, we can materialize the features. It will write the features using flink.
Flink uses a hybrid source to read first from iceberg table and then switches to kafka.
fe.materialize_features(
transactions_with_frequency_score,
"transactions_with_frequency_score",
"index",
BackEnd.Flink,
"transactions_stream",
)
We can now train a model using features built using flink
model = tb.RCF(number_of_trees=50)
numerical_fields = ["frequency_score"]
features = fe.get_input_fields(
"transactions_with_frequency_score", numerical_fields=numerical_fields
)
label = labels.get_label_field(label_field="is_fraud")
deployed_model_rcf = model.deploy(
name="demo_model_ibis_flink", input=features, labels=label
)
outputs = deployed_model_rcf.get_outputs()
sample_output = outputs[-1]
sample_output
import matplotlib.pyplot as plt
plt.plot([output["record"].score for output in outputs])
model_endpoints = deployed_model_rcf.get_endpoints()
model_endpoints
model_query_datapoint = (
transactions_df[["index", "digitalItemCount", "physicalItemCount"]]
.iloc[-1]
.to_dict()
)
model_query_datapoint
import requests
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
outputs = deployed_model_rcf.get_inference(transactions_df)
outputs
Risingwave FE
We can now enrich the earlier built features using flink with features built using RisingWave.
Let's fetch the features from server for the feature group
transactions_with_frequency_score = fe.get_ibis_table(
"transactions_with_frequency_score"
)
@ibis.udf.scalar.python()
def detect_fraud(
transactionAmount: float, transactionTime: int, isProxyIP: float
) -> int:
# Example logic for flagging fraud:
# - High transaction amount
# - Unusual transaction times (e.g., outside of working hours)
# - Use of proxy IP
is_high_amount = transactionAmount > 1000 # arbitrary high amount threshold
is_suspicious_time = (transactionTime < 6) | (
transactionTime > 22
) # non-standard hours
is_proxy = isProxyIP == 1 # proxy IP flag
return int(is_high_amount & is_suspicious_time & is_proxy)
fraud_detection_expr = detect_fraud(
transactions_with_frequency_score.transactionAmount,
transactions_with_frequency_score.transactionTime,
transactions_with_frequency_score.isProxyIP,
)
transactions_with_fraud_flag = transactions_with_frequency_score.select(
transactionAmount=transactions_with_frequency_score.transactionAmount,
transactionTime=transactions_with_frequency_score.transactionTime,
isProxyIP=transactions_with_frequency_score.isProxyIP,
index=transactions_with_frequency_score.index,
digitalItemCount=transactions_with_frequency_score.digitalItemCount,
physicalItemCount=transactions_with_frequency_score.physicalItemCount,
frequency_score=transactions_with_frequency_score.frequency_score,
fraud_flag=fraud_detection_expr,
)
transactions_with_fraud_flag.execute().head()
fe.materialize_features(
transactions_with_fraud_flag,
"transactions_with_fraud_flag",
"index",
BackEnd.Risingwave,
"transactions_stream",
)
model = tb.RCF(number_of_trees=50)
numerical_fields = ["frequency_score", "fraud_flag"]
features = fe.get_input_fields(
"transactions_with_fraud_flag", numerical_fields=numerical_fields
)
label = labels.get_label_field(label_field="is_fraud")
deployed_model_rcf = model.deploy(
name="demo_model_ibis_risingwave", input=features, labels=label
)
outputs = deployed_model_rcf.get_outputs()
sample_output = outputs[-1]
sample_output
import matplotlib.pyplot as plt
plt.plot([output["record"].score for output in outputs])
model_endpoints = deployed_model_rcf.get_endpoints()
model_endpoints
model_query_datapoint = (
transactions_df[
[
"index",
"digitalItemCount",
"physicalItemCount",
"transactionAmount",
"transactionTime",
"isProxyIP",
]
]
.iloc[-1]
.to_dict()
)
model_query_datapoint
import requests
resp = requests.post(
model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
outputs = deployed_model_rcf.get_inference(transactions_df)
outputs