Pipeline Finalization

Introduction

The ingestion finalization step is a critical component in our data pipeline that performs generic transformations, data cleansing tasks, and enrichment required for every data sink. This step ensures that all data is consistent, clean, and enriched before it is forwarded to its final destination. By standardizing data at this stage, we facilitate efficient downstream processing, analytics, and storage.

Key Concepts

Ingestion Finalization

The ingestion finalization step is responsible for:

  • Generic Transformations: Applying standard transformations to data across all types to ensure consistency.

  • Data Cleansing: Removing unnecessary or junk data to improve data quality.

  • Enrichment: Adding additional information or context to data records to enhance their value.

Components

The finalization step comprises several components, each handling a specific part of the data processing pipeline:

  1. Source: Reads data from Kafka topics.

  2. Transforms: A series of transformations applied to the data.

    • Flatten Message: Flattens nested JSON structures.

    • Junk Filter: Removes fields with empty or null values.

    • Fields to Camel Case: Converts field names to camelCase.

    • Event Hash Field: Adds a unique hash to each event.

    • Timestamp Progress Checkpoint: Adds custom timestamps for tracking.

    • Epoch Timestamp Conversion: Converts timestamp fields to epoch milliseconds.

  3. Sink: Writes the processed data back to Kafka or other destinations.

Detailed Explanation

Architecture Overview

The ingestion finalization step uses Vector—a high-performance observability data pipeline—to process data through a series of configurations defined in YAML files:

  • /etc/vector/vector_templates/hs-xdr-vector-ct-all-main.yml

  • /etc/vector/vector_templates/001-source-kafka-aws-saas.yml

  • /etc/vector/vector_templates/101-transform-flatten-message.yml

  • /etc/vector/vector_templates/105-transform-junk-filter.yml

  • /etc/vector/vector_templates/116-transform-fields-to-camel-case.yml

  • /etc/vector/vector_templates/129-transform-event-hash-field.yml

  • /etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml

  • /etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml

  • /etc/vector/vector_templates/202-sink-kafka-aws-saas.yml

These configurations are chained together using the --config flag, allowing Vector to process data sequentially through each step.

1. Source Configuration

File: /etc/vector/vector_templates/001-source-kafka-aws-saas.yml

The source configuration sets up Vector to consume data from Kafka topics.

Key Points:

  • Type: Kafka source.

  • Topics: Configured via environment variables (e.g., KAFKA_SOURCE_TOPIC).

  • TLS Configuration: Ensures secure communication with Kafka brokers.

  • Consumer Group: Manages offsets and load balancing.

Example Configuration:

sources:
  kafka_source:
    type: kafka
    bootstrap_servers: "${KAFKA_BROKERS}"
    topics:
      - "${KAFKA_SOURCE_TOPIC}"
    group_id: "${KAFKA_CONSUMER_GROUP}"
    tls:
      enabled: true
      ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
      key_file: "${KAFKA_MTLS_PATH}/client.key"
      crt_file: "${KAFKA_MTLS_PATH}/client.crt"
    encoding:
      codec: json

2. Transformations

The finalization step includes several transformations applied in sequence to process and enrich the data.

a. Flatten Message Transform

File: /etc/vector/vector_templates/101-transform-flatten-message.yml

Purpose: Flattens nested JSON structures within the message to simplify data processing and analysis.

Configuration:

transforms:
  101_transform_flatten_message:
    type: remap
    inputs:
      - "${_101_TRANSFORM_FLATTEN_MESSAGE_INPUT}"
    source: |
      # Flatten nested structures with an underscore delimiter
      . = flatten(., delimiter: "_")

b. Junk Filter Transform

File: /etc/vector/vector_templates/105-transform-junk-filter.yml

Purpose: Removes all fields with empty or null values to clean the data.

Special Logic:

transforms:
  105_transform_junk_filter:
    type: remap
    inputs:
      - "${_105_TRANSFORM_JUNK_FILTER_INPUT:?err}"
    source: |
      # Remove fields with nullish values recursively
      . = compact(., recursive: true, nullish: true)
  • Explanation: The compact function removes keys with null, empty strings, or empty arrays recursively from the event data.

c. Fields to Camel Case Transform

File: /etc/vector/vector_templates/116-transform-fields-to-camel-case.yml

Purpose: Converts all field names to camelCase to standardize field naming conventions across datasets.

Configuration:

transforms:
  116_transform_fields_to_camel_case:
    type: remap
    inputs:
      - "${_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT}"
    source: |
      # Convert all field names to camelCase
      . = rename_all(., strategy: "lower_camel")

d. Event Hash Field Transform

File: /etc/vector/vector_templates/129-transform-event-hash-field.yml

Purpose: Adds a unique hash (event_hash) to each event for deduplication, tracking, and data integrity verification.

Special Logic:

transforms:
  129_transform_event_hash_field:
    type: remap
    inputs:
      - "${_129_TRANSFORM_EVENT_HASH_FIELD_INPUT}"
    source: |
      # Generate a SHA-512/224 hash of the event
      json_str = to_string(encode_json(.))
      .event_hash = sha2(json_str, variant: "SHA-512/224")
  • Explanation: The entire event is serialized to a JSON string, and a SHA-512/224 hash is computed to create a unique identifier for the event.

e. Timestamp Progress Checkpoint Transform

File: /etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml

Purpose: Adds a custom timestamp field to events to assist with debugging and tracking the data flow through the pipeline.

Special Logic:

transforms:
  152_transform_timestamp_progress_checkpoint:
    type: remap
    inputs:
      - "${_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_INPUT}"
    source: |
      # Add a progress checkpoint timestamp
      .["${_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_FIELD_NAME}"] = now()
  • Explanation: A timestamp field (e.g., timestamp_finalise) is added to record the time the event passed through this transform.

f. Epoch Timestamp Conversion Transform

File: /etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml

Purpose: Converts specified timestamp fields into epoch milliseconds for consistent time representation across events.

Special Logic:

enrichment_tables:
  timestamp_to_epoch_fields:
    type: "file"
    file:
      path: "${VECTOR_ENRICHMENT_PATH:?err}/${TS_TO_EPOCH_MS_CSV:?err}"
      encoding:
        type: "csv"
    schema:
      iteration_group: "string"
      time_field: "string"
      time_field_epochms: "string"

transforms:
  157_transform_epoch_timestamp_conversion:
    type: remap
    inputs:
      - "${_157_TRANSFORM_EPOCH_TIMESTAMP_CONVERSION_INPUT:?err}"
    source: |
      # Load fields to convert from the enrichment table
      timestamp_to_epoch_fields = find_enrichment_table_records(
        "timestamp_to_epoch_fields",
        {"iteration_group": "${FIELD_SETTER_ITERATION_GROUP:?err}"},
        case_sensitive: false
      ) ?? []

      # Iterate over each field specified for conversion
      for_each(timestamp_to_epoch_fields) -> |_index, data| {
        source_field_path = split!(data.time_field, ".")
        destination_field_path = split!(data.time_field_epochms, ".")

        # Handle field paths containing dashes
        if match_array(source_field_path, r'-+') {
          source_field_path = map_values(source_field_path) -> |value| {
            replace(value, "-", ".")
          }
        }
        if match_array(destination_field_path, r'-+') {
          destination_field_path = map_values(destination_field_path) -> |value| {
            replace(value, "-", ".")
          }
        }

        source_field_value = get(., source_field_path) ?? null

        if !is_nullish(source_field_value) {
          # Parse string timestamps
          if is_string(source_field_value) {
            for_each($SUPPORTED_TIMESTAMP_FORMAT) -> |_index, format| {
              source_field_value = parse_timestamp(source_field_value, format) ?? source_field_value
            }
          }
          # Convert integer timestamps
          else if is_integer(source_field_value) {
            source_field_value = from_unix_timestamp!(to_int!(source_field_value), unit: "milliseconds")
          }
          # Convert to epoch milliseconds
          converted_timestamp, _error = to_unix_timestamp(value: source_field_value, unit: "milliseconds")
          . = set!(., destination_field_path, converted_timestamp)
        }
      }
  • Explanation: This transform reads a CSV file specifying which timestamp fields to convert. It parses each timestamp and converts it to epoch milliseconds, storing the result in a specified field.

3. Sink Configuration

File: /etc/vector/vector_templates/202-sink-kafka-aws-saas.yml

Purpose: Sends the processed data to the specified Kafka topic or other sinks for downstream processing or storage.

Key Points:

  • Dynamic Topic Assignment: Uses environment variables to specify sink topics.

  • TLS Configuration: Ensures secure communication with Kafka brokers.

  • Encoding: Configures the data to be encoded as JSON before sending.

  • Buffering and Reliability: Configures how data is buffered and ensures reliable delivery.

Example Configuration:

sinks:
  kafka_sink:
    type: kafka
    inputs:
      - "${_202_SINK_KAFKA_AWS_SAAS_INPUT}"
    bootstrap_servers: "${KAFKA_BROKERS}"
    topic: "${KAFKA_SINK_TOPIC}"
    tls:
      enabled: true
      ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
      key_file: "${KAFKA_MTLS_PATH}/client.key"
      crt_file: "${KAFKA_MTLS_PATH}/client.crt"
    encoding:
      codec: json
    buffer:
      type: disk
      max_size: ${VECTOR_BUFFER_SIZE:-1073741824} # 1GB default
      when_full: block # Backpressure when buffer is full

Environment Variables and Configurations

The finalization step relies on several environment variables defined in the configuration map to allow for dynamic configuration without changing code.

Configuration Map:

VECTOR_DATA_DIR: "/vector-data-dir"
VECTOR_ENRICHMENT_PATH: "/etc/vector/vector_templates"
VECTOR_MTLS_PATH: "/etc/vector_tls"
KAFKA_MTLS_PATH: "/etc/vector_mtls"
KAFKA_SOURCE_TOPIC: "logs_nxlog_land"
KAFKA_SOURCE_TOPIC_SUFFIX: ""
KAFKA_SINK_TOPIC: "logs_nxlog_load"
KAFKA_CONSUMER_GROUP: "vector-logs-nxlog-finalize"
KAFKA_BROKERS: "broker1:9094,broker2:9094,broker3:9094"
_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_FIELD_NAME: "timestamp_finalise"
TS_TO_EPOCH_MS_CSV: "standard_enrichment_files/hypersec-enrichment-timestamp-to-epoch-fields.csv"
SUPPORTED_TIMESTAMP_FORMAT: '["%FT%X%.3fZ", "%FT%X%.6fZ", "%FT%X%.9fZ", "%F %X%.3f", "%F %X%.6f", "%F %X%.9f", "%FT%X%.3f", "%FT%X%.6f", "%FT%X%.9f", "%FT%XZ", "%FT%X", "%F %X", "%FT%X"]'
FIELD_SETTER_ITERATION_GROUP: "finalise"

_101_TRANSFORM_FLATTEN_MESSAGE_INPUT: "001_source_kafka_aws_saas"
_105_TRANSFORM_JUNK_FILTER_INPUT: "101_transform_flatten_message"
_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT: "105_transform_junk_filter"
_129_TRANSFORM_EVENT_HASH_FIELD_INPUT: "116_transform_fields_to_camel_case"
_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_INPUT: "129_transform_event_hash_field"
_157_TRANSFORM_EPOCH_TIMESTAMP_CONVERSION_INPUT: "152_transform_timestamp_progress_checkpoint"
_202_SINK_KAFKA_AWS_SAAS_INPUT: "157_transform_epoch_timestamp_conversion"

Explanation:

  • Data Directories and Paths:

    • VECTOR_DATA_DIR: Directory used for Vector's state and buffers.

    • VECTOR_ENRICHMENT_PATH: Path to enrichment tables used in transformations.

    • VECTOR_MTLS_PATH and KAFKA_MTLS_PATH: Paths to TLS certificates for secure communication.

  • Kafka Configuration:

    • KAFKA_BROKERS: Comma-separated list of Kafka broker addresses.

    • KAFKA_SOURCE_TOPIC and KAFKA_SINK_TOPIC: Source and sink Kafka topics.

    • KAFKA_CONSUMER_GROUP: Consumer group ID for Kafka source.

  • Transformation Inputs:

    • _101_TRANSFORM_FLATTEN_MESSAGE_INPUT, etc.: Specifies the input for each transform, allowing the pipeline to chain transforms dynamically.

  • Timestamp and Enrichment Configuration:

    • TS_TO_EPOCH_MS_CSV: CSV file specifying timestamp fields for conversion.

    • SUPPORTED_TIMESTAMP_FORMAT: List of supported timestamp formats for parsing.

    • FIELD_SETTER_ITERATION_GROUP: Used to select records in enrichment tables.

Data Flow and Transform Order

The data flows through the transforms in the following order:

  1. Source: 001_source_kafka_aws_saas

  2. Flatten Message Transform: 101_transform_flatten_message

  3. Junk Filter Transform: 105_transform_junk_filter

  4. Fields to Camel Case Transform: 116_transform_fields_to_camel_case

  5. Event Hash Field Transform: 129_transform_event_hash_field

  6. Timestamp Progress Checkpoint Transform: 152_transform_timestamp_progress_checkpoint

  7. Epoch Timestamp Conversion Transform: 157_transform_epoch_timestamp_conversion

  8. Sink: 202_sink_kafka_aws_saas

The input and output of each transform are connected via environment variables, allowing for flexibility and ease of configuration.

Examples

Example 1: Removing Empty Fields

Scenario:

An event contains several fields with null or empty values that need to be removed to clean up the data.

Input Event:

{
  "message": "User login successful",
  "user_id": null,
  "session_id": "",
  "timestamp": "2024-03-27T12:34:56Z"
}

Process:

  • The Junk Filter Transform (105_transform_junk_filter) removes fields with null or empty values.

Output Event:

{
  "message": "User login successful",
  "timestamp": "2024-03-27T12:34:56Z"
}

Example 2: Adding Event Hash

Scenario:

An event requires a unique hash for deduplication and tracking purposes.

Input Event:

{
  "message": "User login successful",
  "user_id": "12345",
  "timestamp": "2024-03-27T12:34:56Z"
}

Process:

  • The Event Hash Field Transform (129_transform_event_hash_field) computes a hash of the event.

Output Event:

{
  "message": "User login successful",
  "user_id": "12345",
  "timestamp": "2024-03-27T12:34:56Z",
  "event_hash": "e9b4c6d7a1f2b3c4d5e6f7890abcdef1234567890abcdef1234567890abcdef"
}

Example 3: Timestamp Conversion

Scenario:

An event has a timestamp field in string format that needs to be converted to epoch milliseconds for consistent time representation.

Input Event:

{
  "event_time": "2024-03-27T12:34:56Z",
  "message": "System rebooted"
}

Process:

  • The Epoch Timestamp Conversion Transform (157_transform_epoch_timestamp_conversion) parses event_time and converts it to event_time_epochms.

Output Event:

{
  "event_time": "2024-03-27T12:34:56Z",
  "event_time_epochms": 1711535696000,
  "message": "System rebooted"
}

Example 4: Fields to Camel Case

Scenario:

An event has field names in various cases that need to be standardized to camelCase.

Input Event:

{
  "User_ID": "12345",
  "user_name": "jdoe",
  "USER_EMAIL": "[email protected]"
}

Process:

  • The Fields to Camel Case Transform (116_transform_fields_to_camel_case) converts all field names to camelCase.

Output Event:

{
  "userID": "12345",
  "userName": "jdoe",
  "userEmail": "[email protected]"
}

Conclusion

The ingestion finalization step is essential for preparing data for downstream processing, analytics, and storage. By applying standard transformations, data cleansing, and enrichment, we ensure that data is consistent, clean, and enriched before it reaches its final destination. This step leverages Vector's powerful transformation capabilities and is highly configurable through environment variables and enrichment tables.

Key Takeaways:

  • Modular Design: The pipeline is composed of multiple transforms, each handling specific tasks, making it flexible and maintainable.

  • Data Quality Improvement: Removing junk data and standardizing fields significantly enhances data quality.

  • Data Enrichment: Adding fields like event_hash and converting timestamps adds value and utility to the data.

  • Flexibility and Configurability: Environment variables and enrichment tables allow for dynamic adjustments without code changes, accommodating various data sources and requirements.

  • Consistency: Standardizing field names and timestamp formats ensures consistency across all datasets, facilitating easier data analysis and processing.

For further details or assistance, please refer to the configuration files or contact the technical team.

Last updated