Feature Engineering
Advanced
Ibis Feature Engineering

Feature Engineering - Complex Stream Processing

Open In Colab (opens in a new tab)

With real-time features, there can be situtations where the feature logic cannot be expressed by simple SQL, Aggregates or Scalar Python UDFs. In such scenarios, it may be required to write custom streaming pipelines. This is where TurboML is building on Ibis (https://github.com/ibis-project/ibis/ (opens in a new tab)), to expose a DataFrame like API to support complex streaming logic for features. We currently support Apache Flink and RisingWave backends for streaming execution.

Import necessary libraries

import pandas as pd
import turboml as tb
from turboml.common.sources import (
    FileSource,
    DataSource,
    TimestampFormatConfig,
    Watermark,
    DataDeliveryMode,
    S3Config,
)
from turboml.common.models import BackEnd
import ibis
transactions_df = pd.read_csv("data/transactions.csv").reset_index()
labels_df = pd.read_csv("data/labels.csv").reset_index()
try:
    labels = tb.PandasDataset(
        dataset_name="labels_ibis_fe", key_field="index", dataframe=labels_df, upload=True
    )
except:
    labels = tb.PandasDataset(dataset_name="labels_ibis_fe")

Add feature definitions

To add feature definitions, we have a class from turboml package called IbisFeatureEngineering. This allows us to define features.

fe = tb.IbisFeatureEngineering()

Let's upload the data for this demo

%pip install minio
from minio import Minio
 
client = Minio(
    "play.min.io",
    access_key="Q3AM3UQ867SPQQA43P2F",
    secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
    secure=True,
)
bucket_name = "ibis-demo"
found = client.bucket_exists(bucket_name)
if not found:
    client.make_bucket(bucket_name)
    print("Created bucket", bucket_name)
else:
    print("Bucket", bucket_name, "already exists")
import duckdb
 
con = duckdb.connect()
con.sql("SET s3_region='us-east-1';")
con.sql("SET s3_url_style='path';")
con.sql("SET s3_use_ssl=true;")
con.sql("SET s3_endpoint='play.min.io';")
con.sql("SET s3_access_key_id='Q3AM3UQ867SPQQA43P2F';")
con.sql("SET s3_secret_access_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG';")
con.sql(
    "COPY (SELECT * EXCLUDE(timestamp), TO_TIMESTAMP(CAST(timestamp AS DOUBLE)) AS timestamp FROM transactions_df) TO 's3://ibis-demo/transactions/transactions.parquet' (FORMAT 'parquet');"
)

DataSource

The DataSource serves as the foundational entity in the feature engineering workflow. It defines where and how the raw data is accessed for processing. After creating a DataSource, users can register their source configurations to start leveraging them in the pipeline.

Type of Delivery Modes

  1. Dynamic:
    • Suitable for real-time or streaming data scenarios.
    • Automatically creates connectors based on the source configuration.
    • The Kafka topic becomes the primary input for feature engineering, ensuring seamless integration with downstream processing pipelines.
  2. Static:
    • Designed for batch data sources.
    • RisingWave/Flink reads directly from the source for feature engineering, eliminating the need for an intermediary Kafka topic.
time_col_config = TimestampFormatConfig(
    format_type=TimestampFormatConfig.FormatType.EpochMillis
)
watermark = Watermark(
    time_col="timestamp", allowed_delay_seconds=60, time_col_config=time_col_config
)
ds1 = DataSource(
    name="transactions_stream",
    key_fields=["index"],
    delivery_mode=DataDeliveryMode.DYNAMIC,
    file_source=FileSource(
        path="transactions",
        format=FileSource.Format.PARQUET,
        s3_config=S3Config(
            bucket="ibis-demo",
            region="us-east-1",
            access_key_id="Q3AM3UQ867SPQQA43P2F",
            secret_access_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
            endpoint="https://play.min.io",
        ),
    ),
    watermark=watermark,
)
 
tb.register_source(ds1)

To define features we can fetch the sources and perform operations.

transactions = fe.get_ibis_table("transactions_stream")
transactions

In this example we use one kafka topic (transactions_stream) to build features using Flink.

We will also use UDF to define custom functions.

@ibis.udf.scalar.python()
def calculate_frequency_score(digital_count: float, physical_count: float) -> float:
    if digital_count > 10 or physical_count > 10:
        return 0.7  # High item count
    elif digital_count == 0 and physical_count > 0:
        return 0.3  # Physical item-only transaction
    elif digital_count > 0 and physical_count == 0:
        return 0.3  # Digital item-only transaction
    else:
        return 0.1  # Regular transaction

We can define features using ibis DSL or SQL

transactions_with_frequency_score = transactions.select(
    frequency_score=calculate_frequency_score(
        transactions.digitalItemCount, transactions.physicalItemCount
    ),
    index=transactions.index,
    digitalItemCount=transactions.digitalItemCount,
    physicalItemCount=transactions.physicalItemCount,
    transactionAmount=transactions.transactionAmount,
    transactionTime=transactions.transactionTime,
    isProxyIP=transactions.isProxyIP,
)

We can preview features locally

transactions_with_frequency_score.execute().head()

After satisfied, we can materialize the features. It will write the features using flink.

Flink uses a hybrid source to read first from iceberg table and then switches to kafka.

fe.materialize_features(
    transactions_with_frequency_score,
    "transactions_with_frequency_score",
    "index",
    BackEnd.Flink,
    "transactions_stream",
)

We can now train a model using features built using flink

model = tb.RCF(number_of_trees=50)
numerical_fields = ["frequency_score"]
features = fe.get_input_fields(
    "transactions_with_frequency_score", numerical_fields=numerical_fields
)
label = labels.get_label_field(label_field="is_fraud")
deployed_model_rcf = model.deploy(
    name="demo_model_ibis_flink", input=features, labels=label
)
outputs = deployed_model_rcf.get_outputs()
sample_output = outputs[-1]
sample_output
import matplotlib.pyplot as plt
 
plt.plot([output["record"].score for output in outputs])
model_endpoints = deployed_model_rcf.get_endpoints()
model_endpoints
model_query_datapoint = (
    transactions_df[["index", "digitalItemCount", "physicalItemCount"]]
    .iloc[-1]
    .to_dict()
)
model_query_datapoint
import requests
 
resp = requests.post(
    model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
outputs = deployed_model_rcf.get_inference(transactions_df)
outputs

Risingwave FE

We can now enrich the earlier built features using flink with features built using RisingWave.

Let's fetch the features from server for the feature group

transactions_with_frequency_score = fe.get_ibis_table(
    "transactions_with_frequency_score"
)
@ibis.udf.scalar.python()
def detect_fraud(
    transactionAmount: float, transactionTime: int, isProxyIP: float
) -> int:
    # Example logic for flagging fraud:
    # - High transaction amount
    # - Unusual transaction times (e.g., outside of working hours)
    # - Use of proxy IP
    is_high_amount = transactionAmount > 1000  # arbitrary high amount threshold
    is_suspicious_time = (transactionTime < 6) | (
        transactionTime > 22
    )  # non-standard hours
    is_proxy = isProxyIP == 1  # proxy IP flag
 
    return int(is_high_amount & is_suspicious_time & is_proxy)
fraud_detection_expr = detect_fraud(
    transactions_with_frequency_score.transactionAmount,
    transactions_with_frequency_score.transactionTime,
    transactions_with_frequency_score.isProxyIP,
)
transactions_with_fraud_flag = transactions_with_frequency_score.select(
    transactionAmount=transactions_with_frequency_score.transactionAmount,
    transactionTime=transactions_with_frequency_score.transactionTime,
    isProxyIP=transactions_with_frequency_score.isProxyIP,
    index=transactions_with_frequency_score.index,
    digitalItemCount=transactions_with_frequency_score.digitalItemCount,
    physicalItemCount=transactions_with_frequency_score.physicalItemCount,
    frequency_score=transactions_with_frequency_score.frequency_score,
    fraud_flag=fraud_detection_expr,
)
transactions_with_fraud_flag.execute().head()
fe.materialize_features(
    transactions_with_fraud_flag,
    "transactions_with_fraud_flag",
    "index",
    BackEnd.Risingwave,
    "transactions_stream",
)
model = tb.RCF(number_of_trees=50)
numerical_fields = ["frequency_score", "fraud_flag"]
features = fe.get_input_fields(
    "transactions_with_fraud_flag", numerical_fields=numerical_fields
)
label = labels.get_label_field(label_field="is_fraud")
deployed_model_rcf = model.deploy(
    name="demo_model_ibis_risingwave", input=features, labels=label
)
outputs = deployed_model_rcf.get_outputs()
sample_output = outputs[-1]
sample_output
import matplotlib.pyplot as plt
 
plt.plot([output["record"].score for output in outputs])
model_endpoints = deployed_model_rcf.get_endpoints()
model_endpoints
model_query_datapoint = (
    transactions_df[
        [
            "index",
            "digitalItemCount",
            "physicalItemCount",
            "transactionAmount",
            "transactionTime",
            "isProxyIP",
        ]
    ]
    .iloc[-1]
    .to_dict()
)
model_query_datapoint
import requests
 
resp = requests.post(
    model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
)
resp.json()
outputs = deployed_model_rcf.get_inference(transactions_df)
outputs