Feature Engineering
Advanced
Ibis Feature Engineering

Feature Engineering - Complex Stream Processing

Open In Colab (opens in a new tab)

With real-time features, there can be situtations where the feature logic cannot be expressed by simple SQL, Aggregates or Scalar Python UDFs. In such scenarios, it may be required to write custom streaming pipelines. This is where TurboML is building on Ibis (https://github.com/ibis-project/ibis/ (opens in a new tab)), to expose a DataFrame like API to support complex streaming logic for features. We currently support Apache Flink and RisingWave backends for streaming execution.

Import necessary libraries

    import turboml as tb
    import pandas as pd
    from turboml.common.sources import (
        FileSource,
        DataSource,
        TimestampFormatConfig,
        Watermark,
        DataDeliveryMode,
        S3Config,
    )
    from turboml.common.models import BackEnd
    import ibis
    transactions_df = pd.read_csv("data/transactions.csv").reset_index()
    transactions_df["timestamp"] = pd.to_datetime(transactions_df["timestamp"], unit="s")
    labels_df = pd.read_csv("data/labels.csv").reset_index()
    labels = tb.PandasDataset(
        dataset_name="labels_ibis_fe", key_field="index", dataframe=labels_df, upload=True
    )

Add feature definitions

To add feature definitions, we have a class from turboml package called IbisFeatureEngineering. This allows us to define features.

    fe = tb.IbisFeatureEngineering()

Let's upload the data for this demo

    %pip install minio
    from minio import Minio
    
    client = Minio(
        "play.min.io",
        access_key="Q3AM3UQ867SPQQA43P2F",
        secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
        secure=True,
    )
    bucket_name = "ibis-demo"
    found = client.bucket_exists(bucket_name)
    if not found:
        client.make_bucket(bucket_name)
        print("Created bucket", bucket_name)
    else:
        print("Bucket", bucket_name, "already exists")
    import duckdb
    
    con = duckdb.connect()
    con.sql("SET s3_region='us-east-1';")
    con.sql("SET s3_url_style='path';")
    con.sql("SET s3_use_ssl=true;")
    con.sql("SET s3_endpoint='play.min.io';")
    con.sql("SET s3_access_key_id='Q3AM3UQ867SPQQA43P2F';")
    con.sql("SET s3_secret_access_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG';")
    con.sql(
        "COPY (SELECT * FROM transactions_df) TO 's3://ibis-demo/transactions/transactions.parquet' (FORMAT 'parquet');"
    )

DataSource

The DataSource serves as the foundational entity in the feature engineering workflow. It defines where and how the raw data is accessed for processing. After creating a DataSource, users can register their source configurations to start leveraging them in the pipeline.

Type of Delivery Modes

  1. Dynamic:
    • Suitable for real-time or streaming data scenarios.
    • Automatically creates connectors based on the source configuration.
    • The Kafka topic becomes the primary input for feature engineering, ensuring seamless integration with downstream processing pipelines.
  2. Static:
    • Designed for batch data sources.
    • RisingWave/Flink reads directly from the source for feature engineering, eliminating the need for an intermediary Kafka topic.
    time_col_config = TimestampFormatConfig(
        format_type=TimestampFormatConfig.FormatType.EpochMillis
    )
    watermark = Watermark(
        time_col="timestamp", allowed_delay_seconds=60, time_col_config=time_col_config
    )
    ds1 = DataSource(
        name="transactions_stream",
        key_fields=["index"],
        delivery_mode=DataDeliveryMode.DYNAMIC,
        file_source=FileSource(
            path="transactions",
            format=FileSource.Format.PARQUET,
            s3_config=S3Config(
                bucket="ibis-demo",
                region="us-east-1",
                access_key_id="Q3AM3UQ867SPQQA43P2F",
                secret_access_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
                endpoint="https://play.min.io",
            ),
        ),
        watermark=watermark,
    )
    
    tb.register_source(ds1)

To define features we can fetch the sources and perform operations.

    transactions = fe.get_ibis_table("transactions_stream")
    transactions

In this example we use one kafka topic (transactions_stream) to build features using Flink.

We will also use UDF to define custom functions.

    @ibis.udf.scalar.python()
    def calculate_frequency_score(digital_count: float, physical_count: float) -> float:
        if digital_count > 10 or physical_count > 10:
            return 0.7  # High item count
        elif digital_count == 0 and physical_count > 0:
            return 0.3  # Physical item-only transaction
        elif digital_count > 0 and physical_count == 0:
            return 0.3  # Digital item-only transaction
        else:
            return 0.1  # Regular transaction

We can define features using ibis DSL or SQL

    transactions_with_frequency_score = transactions.select(
        frequency_score=calculate_frequency_score(
            transactions.digitalItemCount, transactions.physicalItemCount
        ),
        index=transactions.index,
        digitalItemCount=transactions.digitalItemCount,
        physicalItemCount=transactions.physicalItemCount,
        transactionAmount=transactions.transactionAmount,
        transactionTime=transactions.transactionTime,
        isProxyIP=transactions.isProxyIP,
    )

We can preview features locally

    transactions_with_frequency_score.execute().head()

After satisfied, we can materialize the features. It will write the features using flink.

Flink uses a hybrid source to read first from iceberg table and then switches to kafka.

    fe.materialize_features(
        transactions_with_frequency_score,
        "transactions_with_frequency_score",
        "index",
        BackEnd.Flink,
        "transactions_stream",
    )

We can now train a model using features built using flink

    model = tb.RCF(number_of_trees=50)
    numerical_fields = ["frequency_score"]
    features = fe.get_input_fields(
        "transactions_with_frequency_score", numerical_fields=numerical_fields
    )
    label = labels.get_label_field(label_field="is_fraud")
    deployed_model_rcf = model.deploy(
        name="demo_model_ibis_flink", input=features, labels=label
    )
    outputs = deployed_model_rcf.get_outputs()
    sample_output = outputs[-1]
    sample_output
    import matplotlib.pyplot as plt
    
    plt.plot([output["record"].score for output in outputs])
    model_endpoints = deployed_model_rcf.get_endpoints()
    model_endpoints
    model_query_datapoint = (
        transactions_df[["index", "digitalItemCount", "physicalItemCount"]]
        .iloc[-1]
        .to_dict()
    )
    model_query_datapoint
    import requests
    
    resp = requests.post(
        model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
    )
    resp.json()
    outputs = deployed_model_rcf.get_inference(transactions_df)
    outputs

Risingwave FE

We can now enrich the earlier built features using flink with features built using RisingWave.

Let's fetch the features from server for the feature group

    transactions_with_frequency_score = fe.get_ibis_table(
        "transactions_with_frequency_score"
    )
    @ibis.udf.scalar.python()
    def detect_fraud(
        transactionAmount: float, transactionTime: int, isProxyIP: float
    ) -> int:
        # Example logic for flagging fraud:
        # - High transaction amount
        # - Unusual transaction times (e.g., outside of working hours)
        # - Use of proxy IP
        is_high_amount = transactionAmount > 1000  # arbitrary high amount threshold
        is_suspicious_time = (transactionTime < 6) | (
            transactionTime > 22
        )  # non-standard hours
        is_proxy = isProxyIP == 1  # proxy IP flag
    
        return int(is_high_amount & is_suspicious_time & is_proxy)
    fraud_detection_expr = detect_fraud(
        transactions_with_frequency_score.transactionAmount,
        transactions_with_frequency_score.transactionTime,
        transactions_with_frequency_score.isProxyIP,
    )
    transactions_with_fraud_flag = transactions_with_frequency_score.select(
        transactionAmount=transactions_with_frequency_score.transactionAmount,
        transactionTime=transactions_with_frequency_score.transactionTime,
        isProxyIP=transactions_with_frequency_score.isProxyIP,
        index=transactions_with_frequency_score.index,
        digitalItemCount=transactions_with_frequency_score.digitalItemCount,
        physicalItemCount=transactions_with_frequency_score.physicalItemCount,
        frequency_score=transactions_with_frequency_score.frequency_score,
        fraud_flag=fraud_detection_expr,
    )
    transactions_with_fraud_flag.execute().head()
    fe.materialize_features(
        transactions_with_fraud_flag,
        "transactions_with_fraud_flag",
        "index",
        BackEnd.Risingwave,
        "transactions_stream",
    )
    model = tb.RCF(number_of_trees=50)
    numerical_fields = ["frequency_score", "fraud_flag"]
    features = fe.get_input_fields(
        "transactions_with_fraud_flag", numerical_fields=numerical_fields
    )
    label = labels.get_label_field(label_field="is_fraud")
    deployed_model_rcf = model.deploy(
        name="demo_model_ibis_risingwave", input=features, labels=label
    )
    outputs = deployed_model_rcf.get_outputs()
    sample_output = outputs[-1]
    sample_output
    import matplotlib.pyplot as plt
    
    plt.plot([output["record"].score for output in outputs])
    model_endpoints = deployed_model_rcf.get_endpoints()
    model_endpoints
    model_query_datapoint = (
        transactions_df[
            [
                "index",
                "digitalItemCount",
                "physicalItemCount",
                "transactionAmount",
                "transactionTime",
                "isProxyIP",
            ]
        ]
        .iloc[-1]
        .to_dict()
    )
    model_query_datapoint
    import requests
    
    resp = requests.post(
        model_endpoints[0], json=model_query_datapoint, headers=tb.common.api.headers
    )
    resp.json()
    outputs = deployed_model_rcf.get_inference(transactions_df)
    outputs