Pipeline Finalization
Introduction
The ingestion finalization step is a critical component in our data pipeline that performs generic transformations, data cleansing tasks, and enrichment required for every data sink. This step ensures that all data is consistent, clean, and enriched before it is forwarded to its final destination. By standardizing data at this stage, we facilitate efficient downstream processing, analytics, and storage.
Key Concepts
Ingestion Finalization
The ingestion finalization step is responsible for:
Generic Transformations: Applying standard transformations to data across all types to ensure consistency.
Data Cleansing: Removing unnecessary or junk data to improve data quality.
Enrichment: Adding additional information or context to data records to enhance their value.
Components
The finalization step comprises several components, each handling a specific part of the data processing pipeline:
Source: Reads data from Kafka topics.
Transforms: A series of transformations applied to the data.
Flatten Message: Flattens nested JSON structures.
Junk Filter: Removes fields with empty or null values.
Fields to Camel Case: Converts field names to camelCase.
Event Hash Field: Adds a unique hash to each event.
Timestamp Progress Checkpoint: Adds custom timestamps for tracking.
Epoch Timestamp Conversion: Converts timestamp fields to epoch milliseconds.
Sink: Writes the processed data back to Kafka or other destinations.
Detailed Explanation
Architecture Overview
The ingestion finalization step uses Vector—a high-performance observability data pipeline—to process data through a series of configurations defined in YAML files:
/etc/vector/vector_templates/hs-xdr-vector-ct-all-main.yml/etc/vector/vector_templates/001-source-kafka-aws-saas.yml/etc/vector/vector_templates/101-transform-flatten-message.yml/etc/vector/vector_templates/105-transform-junk-filter.yml/etc/vector/vector_templates/116-transform-fields-to-camel-case.yml/etc/vector/vector_templates/129-transform-event-hash-field.yml/etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml/etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml/etc/vector/vector_templates/202-sink-kafka-aws-saas.yml
These configurations are chained together using the --config flag, allowing Vector to process data sequentially through each step.
1. Source Configuration
File: /etc/vector/vector_templates/001-source-kafka-aws-saas.yml
The source configuration sets up Vector to consume data from Kafka topics.
Key Points:
Type: Kafka source.
Topics: Configured via environment variables (e.g.,
KAFKA_SOURCE_TOPIC).TLS Configuration: Ensures secure communication with Kafka brokers.
Consumer Group: Manages offsets and load balancing.
Example Configuration:
sources:
kafka_source:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
topics:
- "${KAFKA_SOURCE_TOPIC}"
group_id: "${KAFKA_CONSUMER_GROUP}"
tls:
enabled: true
ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
key_file: "${KAFKA_MTLS_PATH}/client.key"
crt_file: "${KAFKA_MTLS_PATH}/client.crt"
encoding:
codec: json2. Transformations
The finalization step includes several transformations applied in sequence to process and enrich the data.
a. Flatten Message Transform
File: /etc/vector/vector_templates/101-transform-flatten-message.yml
Purpose: Flattens nested JSON structures within the message to simplify data processing and analysis.
Configuration:
transforms:
101_transform_flatten_message:
type: remap
inputs:
- "${_101_TRANSFORM_FLATTEN_MESSAGE_INPUT}"
source: |
# Flatten nested structures with an underscore delimiter
. = flatten(., delimiter: "_")b. Junk Filter Transform
File: /etc/vector/vector_templates/105-transform-junk-filter.yml
Purpose: Removes all fields with empty or null values to clean the data.
Special Logic:
transforms:
105_transform_junk_filter:
type: remap
inputs:
- "${_105_TRANSFORM_JUNK_FILTER_INPUT:?err}"
source: |
# Remove fields with nullish values recursively
. = compact(., recursive: true, nullish: true)Explanation: The
compactfunction removes keys with null, empty strings, or empty arrays recursively from the event data.
c. Fields to Camel Case Transform
File: /etc/vector/vector_templates/116-transform-fields-to-camel-case.yml
Purpose: Converts all field names to camelCase to standardize field naming conventions across datasets.
Configuration:
transforms:
116_transform_fields_to_camel_case:
type: remap
inputs:
- "${_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT}"
source: |
# Convert all field names to camelCase
. = rename_all(., strategy: "lower_camel")d. Event Hash Field Transform
File: /etc/vector/vector_templates/129-transform-event-hash-field.yml
Purpose: Adds a unique hash (event_hash) to each event for deduplication, tracking, and data integrity verification.
Special Logic:
transforms:
129_transform_event_hash_field:
type: remap
inputs:
- "${_129_TRANSFORM_EVENT_HASH_FIELD_INPUT}"
source: |
# Generate a SHA-512/224 hash of the event
json_str = to_string(encode_json(.))
.event_hash = sha2(json_str, variant: "SHA-512/224")Explanation: The entire event is serialized to a JSON string, and a SHA-512/224 hash is computed to create a unique identifier for the event.
e. Timestamp Progress Checkpoint Transform
File: /etc/vector/vector_templates/152-transform-timestamp-progress-checkpoint.yml
Purpose: Adds a custom timestamp field to events to assist with debugging and tracking the data flow through the pipeline.
Special Logic:
transforms:
152_transform_timestamp_progress_checkpoint:
type: remap
inputs:
- "${_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_INPUT}"
source: |
# Add a progress checkpoint timestamp
.["${_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_FIELD_NAME}"] = now()Explanation: A timestamp field (e.g.,
timestamp_finalise) is added to record the time the event passed through this transform.
f. Epoch Timestamp Conversion Transform
File: /etc/vector/vector_templates/157-transform-epoch-timestamp-conversion.yml
Purpose: Converts specified timestamp fields into epoch milliseconds for consistent time representation across events.
Special Logic:
enrichment_tables:
timestamp_to_epoch_fields:
type: "file"
file:
path: "${VECTOR_ENRICHMENT_PATH:?err}/${TS_TO_EPOCH_MS_CSV:?err}"
encoding:
type: "csv"
schema:
iteration_group: "string"
time_field: "string"
time_field_epochms: "string"
transforms:
157_transform_epoch_timestamp_conversion:
type: remap
inputs:
- "${_157_TRANSFORM_EPOCH_TIMESTAMP_CONVERSION_INPUT:?err}"
source: |
# Load fields to convert from the enrichment table
timestamp_to_epoch_fields = find_enrichment_table_records(
"timestamp_to_epoch_fields",
{"iteration_group": "${FIELD_SETTER_ITERATION_GROUP:?err}"},
case_sensitive: false
) ?? []
# Iterate over each field specified for conversion
for_each(timestamp_to_epoch_fields) -> |_index, data| {
source_field_path = split!(data.time_field, ".")
destination_field_path = split!(data.time_field_epochms, ".")
# Handle field paths containing dashes
if match_array(source_field_path, r'-+') {
source_field_path = map_values(source_field_path) -> |value| {
replace(value, "-", ".")
}
}
if match_array(destination_field_path, r'-+') {
destination_field_path = map_values(destination_field_path) -> |value| {
replace(value, "-", ".")
}
}
source_field_value = get(., source_field_path) ?? null
if !is_nullish(source_field_value) {
# Parse string timestamps
if is_string(source_field_value) {
for_each($SUPPORTED_TIMESTAMP_FORMAT) -> |_index, format| {
source_field_value = parse_timestamp(source_field_value, format) ?? source_field_value
}
}
# Convert integer timestamps
else if is_integer(source_field_value) {
source_field_value = from_unix_timestamp!(to_int!(source_field_value), unit: "milliseconds")
}
# Convert to epoch milliseconds
converted_timestamp, _error = to_unix_timestamp(value: source_field_value, unit: "milliseconds")
. = set!(., destination_field_path, converted_timestamp)
}
}Explanation: This transform reads a CSV file specifying which timestamp fields to convert. It parses each timestamp and converts it to epoch milliseconds, storing the result in a specified field.
3. Sink Configuration
File: /etc/vector/vector_templates/202-sink-kafka-aws-saas.yml
Purpose: Sends the processed data to the specified Kafka topic or other sinks for downstream processing or storage.
Key Points:
Dynamic Topic Assignment: Uses environment variables to specify sink topics.
TLS Configuration: Ensures secure communication with Kafka brokers.
Encoding: Configures the data to be encoded as JSON before sending.
Buffering and Reliability: Configures how data is buffered and ensures reliable delivery.
Example Configuration:
sinks:
kafka_sink:
type: kafka
inputs:
- "${_202_SINK_KAFKA_AWS_SAAS_INPUT}"
bootstrap_servers: "${KAFKA_BROKERS}"
topic: "${KAFKA_SINK_TOPIC}"
tls:
enabled: true
ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
key_file: "${KAFKA_MTLS_PATH}/client.key"
crt_file: "${KAFKA_MTLS_PATH}/client.crt"
encoding:
codec: json
buffer:
type: disk
max_size: ${VECTOR_BUFFER_SIZE:-1073741824} # 1GB default
when_full: block # Backpressure when buffer is fullEnvironment Variables and Configurations
The finalization step relies on several environment variables defined in the configuration map to allow for dynamic configuration without changing code.
Configuration Map:
VECTOR_DATA_DIR: "/vector-data-dir"
VECTOR_ENRICHMENT_PATH: "/etc/vector/vector_templates"
VECTOR_MTLS_PATH: "/etc/vector_tls"
KAFKA_MTLS_PATH: "/etc/vector_mtls"
KAFKA_SOURCE_TOPIC: "logs_nxlog_land"
KAFKA_SOURCE_TOPIC_SUFFIX: ""
KAFKA_SINK_TOPIC: "logs_nxlog_load"
KAFKA_CONSUMER_GROUP: "vector-logs-nxlog-finalize"
KAFKA_BROKERS: "broker1:9094,broker2:9094,broker3:9094"
_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_FIELD_NAME: "timestamp_finalise"
TS_TO_EPOCH_MS_CSV: "standard_enrichment_files/hypersec-enrichment-timestamp-to-epoch-fields.csv"
SUPPORTED_TIMESTAMP_FORMAT: '["%FT%X%.3fZ", "%FT%X%.6fZ", "%FT%X%.9fZ", "%F %X%.3f", "%F %X%.6f", "%F %X%.9f", "%FT%X%.3f", "%FT%X%.6f", "%FT%X%.9f", "%FT%XZ", "%FT%X", "%F %X", "%FT%X"]'
FIELD_SETTER_ITERATION_GROUP: "finalise"
_101_TRANSFORM_FLATTEN_MESSAGE_INPUT: "001_source_kafka_aws_saas"
_105_TRANSFORM_JUNK_FILTER_INPUT: "101_transform_flatten_message"
_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT: "105_transform_junk_filter"
_129_TRANSFORM_EVENT_HASH_FIELD_INPUT: "116_transform_fields_to_camel_case"
_152_TRANSFORM_TIMESTAMP_PROGRESS_CHECKPOINT_INPUT: "129_transform_event_hash_field"
_157_TRANSFORM_EPOCH_TIMESTAMP_CONVERSION_INPUT: "152_transform_timestamp_progress_checkpoint"
_202_SINK_KAFKA_AWS_SAAS_INPUT: "157_transform_epoch_timestamp_conversion"Explanation:
Data Directories and Paths:
VECTOR_DATA_DIR: Directory used for Vector's state and buffers.VECTOR_ENRICHMENT_PATH: Path to enrichment tables used in transformations.VECTOR_MTLS_PATHandKAFKA_MTLS_PATH: Paths to TLS certificates for secure communication.
Kafka Configuration:
KAFKA_BROKERS: Comma-separated list of Kafka broker addresses.KAFKA_SOURCE_TOPICandKAFKA_SINK_TOPIC: Source and sink Kafka topics.KAFKA_CONSUMER_GROUP: Consumer group ID for Kafka source.
Transformation Inputs:
_101_TRANSFORM_FLATTEN_MESSAGE_INPUT, etc.: Specifies the input for each transform, allowing the pipeline to chain transforms dynamically.
Timestamp and Enrichment Configuration:
TS_TO_EPOCH_MS_CSV: CSV file specifying timestamp fields for conversion.SUPPORTED_TIMESTAMP_FORMAT: List of supported timestamp formats for parsing.FIELD_SETTER_ITERATION_GROUP: Used to select records in enrichment tables.
Data Flow and Transform Order
The data flows through the transforms in the following order:
Source:
001_source_kafka_aws_saasFlatten Message Transform:
101_transform_flatten_messageJunk Filter Transform:
105_transform_junk_filterFields to Camel Case Transform:
116_transform_fields_to_camel_caseEvent Hash Field Transform:
129_transform_event_hash_fieldTimestamp Progress Checkpoint Transform:
152_transform_timestamp_progress_checkpointEpoch Timestamp Conversion Transform:
157_transform_epoch_timestamp_conversionSink:
202_sink_kafka_aws_saas
The input and output of each transform are connected via environment variables, allowing for flexibility and ease of configuration.
Examples
Example 1: Removing Empty Fields
Scenario:
An event contains several fields with null or empty values that need to be removed to clean up the data.
Input Event:
{
"message": "User login successful",
"user_id": null,
"session_id": "",
"timestamp": "2024-03-27T12:34:56Z"
}Process:
The Junk Filter Transform (
105_transform_junk_filter) removes fields with null or empty values.
Output Event:
{
"message": "User login successful",
"timestamp": "2024-03-27T12:34:56Z"
}Example 2: Adding Event Hash
Scenario:
An event requires a unique hash for deduplication and tracking purposes.
Input Event:
{
"message": "User login successful",
"user_id": "12345",
"timestamp": "2024-03-27T12:34:56Z"
}Process:
The Event Hash Field Transform (
129_transform_event_hash_field) computes a hash of the event.
Output Event:
{
"message": "User login successful",
"user_id": "12345",
"timestamp": "2024-03-27T12:34:56Z",
"event_hash": "e9b4c6d7a1f2b3c4d5e6f7890abcdef1234567890abcdef1234567890abcdef"
}Example 3: Timestamp Conversion
Scenario:
An event has a timestamp field in string format that needs to be converted to epoch milliseconds for consistent time representation.
Input Event:
{
"event_time": "2024-03-27T12:34:56Z",
"message": "System rebooted"
}Process:
The Epoch Timestamp Conversion Transform (
157_transform_epoch_timestamp_conversion) parsesevent_timeand converts it toevent_time_epochms.
Output Event:
{
"event_time": "2024-03-27T12:34:56Z",
"event_time_epochms": 1711535696000,
"message": "System rebooted"
}Example 4: Fields to Camel Case
Scenario:
An event has field names in various cases that need to be standardized to camelCase.
Input Event:
{
"User_ID": "12345",
"user_name": "jdoe",
"USER_EMAIL": "[email protected]"
}Process:
The Fields to Camel Case Transform (
116_transform_fields_to_camel_case) converts all field names to camelCase.
Output Event:
{
"userID": "12345",
"userName": "jdoe",
"userEmail": "[email protected]"
}Conclusion
The ingestion finalization step is essential for preparing data for downstream processing, analytics, and storage. By applying standard transformations, data cleansing, and enrichment, we ensure that data is consistent, clean, and enriched before it reaches its final destination. This step leverages Vector's powerful transformation capabilities and is highly configurable through environment variables and enrichment tables.
Key Takeaways:
Modular Design: The pipeline is composed of multiple transforms, each handling specific tasks, making it flexible and maintainable.
Data Quality Improvement: Removing junk data and standardizing fields significantly enhances data quality.
Data Enrichment: Adding fields like
event_hashand converting timestamps adds value and utility to the data.Flexibility and Configurability: Environment variables and enrichment tables allow for dynamic adjustments without code changes, accommodating various data sources and requirements.
Consistency: Standardizing field names and timestamp formats ensures consistency across all datasets, facilitating easier data analysis and processing.
For further details or assistance, please refer to the configuration files or contact the technical team.
Last updated