Pipeline Finalization

Introduction

The ingestion finalization step is a critical component in our data pipeline that performs generic transformations, data cleansing tasks, and enrichment required for every data sink. This step ensures that all data is consistent, clean, and enriched before it is forwarded to its final destination. By standardizing data at this stage, we facilitate efficient downstream processing, analytics, and storage.

Key Concepts

Ingestion Finalization

The ingestion finalization step is responsible for:

  • Generic Transformations: Applying standard transformations to data across all types to ensure consistency.

  • Data Cleansing: Removing unnecessary or junk data to improve data quality.

  • Enrichment: Adding additional information or context to data records to enhance their value.

Components

The finalization step comprises several components, each handling a specific part of the data processing pipeline:

  1. Source: Reads data from Kafka topics.

  2. Transforms: A series of transformations applied to the data.

    • Flatten Message: Flattens nested JSON structures.

    • Junk Filter: Removes fields with empty or null values.

    • Fields to Camel Case: Converts field names to camelCase.

    • Event Hash Field: Adds a unique hash to each event.

    • Timestamp Progress Checkpoint: Adds custom timestamps for tracking.

    • Epoch Timestamp Conversion: Converts timestamp fields to epoch milliseconds.

  3. Sink: Writes the processed data back to Kafka or other destinations.

Detailed Explanation

Architecture Overview

The ingestion finalization step uses Vector—a high-performance observability data pipeline—to process data through a series of configurations defined in YAML files:

  • /etc/vector/vector_templates/hs-xdr-vector-ct-all-main.yml

  • /etc/vector/vector_templates/001-source-kafka-aws-saas.yml

  • /etc/vector/vector_templates/101-transform-flatten-message.yml

  • /etc/vector/vector_templates/105-transform-junk-filter.yml

  • /etc/vector/vector_templates/116-transform-fields-to-camel-case.yml

  • /etc/vector/vector_templates/129-transform-event-hash-field.yml

  • /etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml

  • /etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml

  • /etc/vector/vector_templates/202-sink-kafka-aws-saas.yml

These configurations are chained together using the --config flag, allowing Vector to process data sequentially through each step.

1. Source Configuration

File: /etc/vector/vector_templates/001-source-kafka-aws-saas.yml

The source configuration sets up Vector to consume data from Kafka topics.

Key Points:

  • Type: Kafka source.

  • Topics: Configured via environment variables (e.g., KAFKA_SOURCE_TOPIC).

  • TLS Configuration: Ensures secure communication with Kafka brokers.

  • Consumer Group: Manages offsets and load balancing.

Example Configuration:

2. Transformations

The finalization step includes several transformations applied in sequence to process and enrich the data.

a. Flatten Message Transform

File: /etc/vector/vector_templates/101-transform-flatten-message.yml

Purpose: Flattens nested JSON structures within the message to simplify data processing and analysis.

Configuration:

b. Junk Filter Transform

File: /etc/vector/vector_templates/105-transform-junk-filter.yml

Purpose: Removes all fields with empty or null values to clean the data.

Special Logic:

  • Explanation: The compact function removes keys with null, empty strings, or empty arrays recursively from the event data.

c. Fields to Camel Case Transform

File: /etc/vector/vector_templates/116-transform-fields-to-camel-case.yml

Purpose: Converts all field names to camelCase to standardize field naming conventions across datasets.

Configuration:

d. Event Hash Field Transform

File: /etc/vector/vector_templates/129-transform-event-hash-field.yml

Purpose: Adds a unique hash (event_hash) to each event for deduplication, tracking, and data integrity verification.

Special Logic:

  • Explanation: The entire event is serialized to a JSON string, and a SHA-512/224 hash is computed to create a unique identifier for the event.

e. Timestamp Progress Checkpoint Transform

File: /etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml

Purpose: Adds a custom timestamp field to events to assist with debugging and tracking the data flow through the pipeline.

Special Logic:

  • Explanation: A timestamp field (e.g., timestamp_finalise) is added to record the time the event passed through this transform.

f. Epoch Timestamp Conversion Transform

File: /etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml

Purpose: Converts specified timestamp fields into epoch milliseconds for consistent time representation across events.

Special Logic:

  • Explanation: This transform reads a CSV file specifying which timestamp fields to convert. It parses each timestamp and converts it to epoch milliseconds, storing the result in a specified field.

3. Sink Configuration

File: /etc/vector/vector_templates/202-sink-kafka-aws-saas.yml

Purpose: Sends the processed data to the specified Kafka topic or other sinks for downstream processing or storage.

Key Points:

  • Dynamic Topic Assignment: Uses environment variables to specify sink topics.

  • TLS Configuration: Ensures secure communication with Kafka brokers.

  • Encoding: Configures the data to be encoded as JSON before sending.

  • Buffering and Reliability: Configures how data is buffered and ensures reliable delivery.

Example Configuration:

Environment Variables and Configurations

The finalization step relies on several environment variables defined in the configuration map to allow for dynamic configuration without changing code.

Configuration Map:

Explanation:

  • Data Directories and Paths:

    • VECTOR_DATA_DIR: Directory used for Vector's state and buffers.

    • VECTOR_ENRICHMENT_PATH: Path to enrichment tables used in transformations.

    • VECTOR_MTLS_PATH and KAFKA_MTLS_PATH: Paths to TLS certificates for secure communication.

  • Kafka Configuration:

    • KAFKA_BROKERS: Comma-separated list of Kafka broker addresses.

    • KAFKA_SOURCE_TOPIC and KAFKA_SINK_TOPIC: Source and sink Kafka topics.

    • KAFKA_CONSUMER_GROUP: Consumer group ID for Kafka source.

  • Transformation Inputs:

    • _101_TRANSFORM_FLATTEN_MESSAGE_INPUT, etc.: Specifies the input for each transform, allowing the pipeline to chain transforms dynamically.

  • Timestamp and Enrichment Configuration:

    • TS_TO_EPOCH_MS_CSV: CSV file specifying timestamp fields for conversion.

    • SUPPORTED_TIMESTAMP_FORMAT: List of supported timestamp formats for parsing.

    • FIELD_SETTER_ITERATION_GROUP: Used to select records in enrichment tables.

Data Flow and Transform Order

The data flows through the transforms in the following order:

  1. Source: 001_source_kafka_aws_saas

  2. Flatten Message Transform: 101_transform_flatten_message

  3. Junk Filter Transform: 105_transform_junk_filter

  4. Fields to Camel Case Transform: 116_transform_fields_to_camel_case

  5. Event Hash Field Transform: 129_transform_event_hash_field

  6. Timestamp Progress Checkpoint Transform: 152_transform_timestamp_progress_checkpoint

  7. Epoch Timestamp Conversion Transform: 157_transform_epoch_timestamp_conversion

  8. Sink: 202_sink_kafka_aws_saas

The input and output of each transform are connected via environment variables, allowing for flexibility and ease of configuration.

Examples

Example 1: Removing Empty Fields

Scenario:

An event contains several fields with null or empty values that need to be removed to clean up the data.

Input Event:

Process:

  • The Junk Filter Transform (105_transform_junk_filter) removes fields with null or empty values.

Output Event:

Example 2: Adding Event Hash

Scenario:

An event requires a unique hash for deduplication and tracking purposes.

Input Event:

Process:

  • The Event Hash Field Transform (129_transform_event_hash_field) computes a hash of the event.

Output Event:

Example 3: Timestamp Conversion

Scenario:

An event has a timestamp field in string format that needs to be converted to epoch milliseconds for consistent time representation.

Input Event:

Process:

  • The Epoch Timestamp Conversion Transform (157_transform_epoch_timestamp_conversion) parses event_time and converts it to event_time_epochms.

Output Event:

Example 4: Fields to Camel Case

Scenario:

An event has field names in various cases that need to be standardized to camelCase.

Input Event:

Process:

  • The Fields to Camel Case Transform (116_transform_fields_to_camel_case) converts all field names to camelCase.

Output Event:

Conclusion

The ingestion finalization step is essential for preparing data for downstream processing, analytics, and storage. By applying standard transformations, data cleansing, and enrichment, we ensure that data is consistent, clean, and enriched before it reaches its final destination. This step leverages Vector's powerful transformation capabilities and is highly configurable through environment variables and enrichment tables.

Key Takeaways:

  • Modular Design: The pipeline is composed of multiple transforms, each handling specific tasks, making it flexible and maintainable.

  • Data Quality Improvement: Removing junk data and standardizing fields significantly enhances data quality.

  • Data Enrichment: Adding fields like event_hash and converting timestamps adds value and utility to the data.

  • Flexibility and Configurability: Environment variables and enrichment tables allow for dynamic adjustments without code changes, accommodating various data sources and requirements.

  • Consistency: Standardizing field names and timestamp formats ensures consistency across all datasets, facilitating easier data analysis and processing.

For further details or assistance, please refer to the configuration files or contact the technical team.

Last updated