ClickHouse Loading
Introduction
The ingestion load to ClickHouse step is a crucial part of our data pipeline, responsible for loading processed data into ClickHouse—a high-performance, open-source columnar database. This step ensures that data is correctly routed to the appropriate tables based on subschemas, handles customer organization IDs (org_id
), and applies necessary transformations for efficient storage and retrieval. By meticulously processing and organizing data at this stage, we facilitate fast query performance and accurate analytics downstream.
Key Concepts
Loading Data into ClickHouse
Efficient Data Storage: ClickHouse is optimized for handling large volumes of data. Properly structuring data before loading maximizes performance.
Subschema Routing: Data is categorized into subschemas based on specific criteria, ensuring it is stored in the correct tables.
Organization Identification: Events are associated with
org_id
to segregate data by customer, maintaining data integrity and security.Transformations: Necessary data transformations are applied to conform to ClickHouse's data types and schema requirements.
Components
The ingestion load to ClickHouse step comprises several components, each serving a specific function in the data processing pipeline:
Source: Reads data from Kafka topics using a topic regex to match multiple topics.
Transforms: A series of transformations applied to the data:
Filter Transform: Filters events based on specified criteria.
Timestamp Load Transform: Adds a timestamp indicating when the data was loaded.
JSON Remap Transform: Converts specified JSON fields into strings.
Fields to Snake Case Transform: Converts field names to
snake_case
.Custom Transformations: Applies additional modifications to align data with ClickHouse requirements.
Subschema Mapping Transform: Assigns events to subschemas based on conditions.
Dead Letter Queue (DLQ) Routing Transform: Routes events to DLQ if they fail certain checks.
Sinks: Writes the processed data to ClickHouse or routes it to Kafka DLQ:
ClickHouse Sink: Loads data into ClickHouse tables.
Kafka DLQ Sink: Routes failed events to a Kafka topic for further inspection.
Detailed Explanation
Architecture Overview
The ingestion load to ClickHouse step uses Vector—a high-performance observability data pipeline—to process data through a series of configurations defined in YAML files and environment variables. The configurations are designed to process data sequentially, ensuring data integrity and compliance with ClickHouse's schema.
Components Include:
Transforms:
138_transform_filter
115_transform_timestamp_load
108_transform_ch_json_remap
116_transform_fields_to_snake_case
109_transform_ch_custom_transformations
118_transform_ch_subschema_mapping
159_transform_dlq_event_category_filter
159_transform_dlq_event_category
Enrichment Tables:
filter_whitelist
json_field_map
subschema_mapping_csv
dlq_event_category
Environment Variables: Define paths, Kafka configurations, and inputs for each component.
1. Source Configuration
Purpose: Reads data from Kafka topics using a regular expression to match multiple topics.
Key Points:
Type: Kafka source.
Topics: Configured via environment variable
KAFKA_SOURCE_TOPIC_REGEX
, which allows matching multiple topics.TLS Configuration: Ensures secure communication with Kafka brokers.
Consumer Group: Manages offsets and load balancing.
Example Configuration:
sources:
kafka_source:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
topics_regex: "${KAFKA_SOURCE_TOPIC_REGEX}"
group_id: "${KAFKA_CONSUMER_GROUP}"
tls:
enabled: true
ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
key_file: "${KAFKA_MTLS_PATH}/client.key"
crt_file: "${KAFKA_MTLS_PATH}/client.crt"
encoding:
codec: json
2. Transformation Steps
The data undergoes several transformations to prepare it for loading into ClickHouse.
a. Filter Transform
File: Not specified (Configuration provided in the components)
Purpose: Filters events based on specified criteria from an enrichment table.
Enrichment Table: filter_whitelist
Configuration:
enrichment_tables:
filter_whitelist:
type: "file"
file:
path: "${VECTOR_ENRICHMENT_PATH:?err}/${FILTER_CSV:?err}"
encoding:
type: "csv"
schema:
iterator: "string"
match_column: "string"
match_value: "string"
action: "string"
transforms:
138_transform_filter:
type: remap
inputs:
- ${_138_TRANSFORM_FILTER_INPUT:?err}
source: |
remove_flag = false
always_override_flag = false
fields = find_enrichment_table_records("filter_whitelist", {"iterator": "${FILTER_ITERATION_GROUP:?err}"}, case_sensitive: false) ?? []
for_each(fields) -> |_index, data| {
is_matched = false
field_path = split(to_string!(data.match_column), ".")
if match_array(field_path, r'-+') {
field_path = map_values(field_path) -> |value| {
replace(value, "-", ".")
}
}
value = get(., field_path) ?? null
if value == data.match_value {
is_matched = true
} else {
if includes(values(.), data.match_value) {
is_matched = true
}
}
if (is_matched) {
if (data.action == "never_match") {
remove_flag = true
} else if (data.action == "must_match") {
remove_flag = false
} else if (data.action == "always_override") {
always_override_flag = true
remove_flag = false
}
} else {
if (data.action == "must_match") {
remove_flag = true
}
}
}
if remove_flag && !always_override_flag {
abort
}
Explanation:
Logic:
Iterates over the
filter_whitelist
enrichment table.Checks if event fields match specified conditions.
Applies actions based on the
action
field:never_match
: Discards events that match the condition.must_match
: Keeps events that match; discards those that don't.always_override
: Keeps events regardless of other conditions.
Outcome: Events not meeting the criteria are discarded; others proceed to the next step.
b. Timestamp Load Transform
Purpose: Adds a timestamp_load
field indicating when the event was processed.
Configuration:
transforms:
115_transform_timestamp_load:
type: remap
inputs:
- ${_115_TRANSFORM_TIMESTAMP_LOAD_INPUT:?err}
source: |
. = set!(value:., path: ["timestamp_load"], data: now())
Explanation:
Adds a
timestamp_load
field with the current UTC time.Helps in tracking and debugging the data flow.
c. JSON Remap Transform
Enrichment Table: json_field_map
Purpose: Converts specified JSON fields into strings for compatibility with ClickHouse's data types.
Configuration:
enrichment_tables:
json_field_map:
type: "file"
file:
path: "${VECTOR_ENRICHMENT_PATH:?err}/${JSON_FIELDS:?err}"
encoding:
type: "csv"
schema:
iterator: "string"
field_to_map_as_json_str: "string"
transforms:
108_transform_ch_json_remap:
type: remap
inputs:
- ${_108_TRANSFORM_CH_JSON_REMAP}
source: |
.tags_event_org_id = .tags.event.org_id
.tags_event_category = .tags.event.category
columns = find_enrichment_table_records("json_field_map", {"iterator": "finalise"}, case_sensitive: false) ?? []
for_each(columns) -> |_index, column| {
field_path = split!(column.field_to_map_as_json_str, ".")
value = get(., field_path) ?? null
if !is_nullish(value) {
renamed_field = string!(column.field_to_map_as_json_str) + "_str"
. = set!(value: ., path: [renamed_field], data: to_string(encode_json(value)))
. = remove!(value: ., path: field_path)
}
}
Explanation:
Logic:
Reads the fields to convert from the
json_field_map
enrichment table.For each field:
Converts the JSON field into a JSON string.
Stores it in a new field with a
_str
suffix.Removes the original JSON field.
Outcome: Ensures that complex JSON structures are stored as strings in ClickHouse, avoiding schema conflicts.
d. Fields to Snake Case Transform
Purpose: Converts field names from camelCase or other formats to snake_case
to comply with ClickHouse's naming conventions.
Configuration:
transforms:
116_transform_fields_to_camel_case:
type: remap
inputs:
- ${_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT:?err}
source: |
. = map_keys(., recursive: true) -> |key| {
key = replace(key, r'\B(?P<char>[A-Z]+)', "_$$char")
key = replace(key, "__", "_")
downcase(key)
}
Explanation:
Logic:
Recursively iterates over all keys in the event.
Inserts an underscore before uppercase letters (e.g.,
CamelCase
toCamel_Case
).Replaces double underscores with single underscores.
Converts all keys to lowercase.
Outcome: Field names are standardized to
snake_case
, ensuring consistency and compliance with ClickHouse schema.
e. Custom Transformations
Purpose: Applies additional modifications to align events with ClickHouse's data types and schema.
Configuration:
transforms:
109_transform_ch_custom_transformations:
type: remap
inputs:
- ${_109_TRANSFORM_CH_CUSTOM_TRANSFORMATIONS_INPUT:?err}
source: |
. = map_values(., recursive: true) -> |value| {
if is_timestamp(value) {
value = to_unix_timestamp!(value, unit: "milliseconds")
} else if match(to_string(value) ?? "", r'\d{4}-\d{2}-\d{2}+') {
for_each($SUPPORTED_TIMESTAMP_FORMAT) -> |_index, format| {
value = to_unix_timestamp(
parse_timestamp(value, format) ??
parse_timestamp(value, format + "%:z") ??
parse_timestamp(value, format + "%::z") ??
null, unit: "milliseconds") ?? value
}
} else if contains(to_string(value) ?? "", "$") {
value = replace(to_string!(value), "$", "$$")
}
value
}
if (exists(.timestamp) && exists(.@timestamp)) { del(.timestamp) }
. = map_keys(flatten(.), recursive: true) -> |key| {
key = replace(replace(key, "@", ""), ".", "_")
replace(key, "-", "_")
}
Explanation:
Value Transformations:
Converts timestamp fields to Unix epoch milliseconds.
Parses string representations of dates into Unix epoch milliseconds.
Escapes dollar signs (
$
) to prevent ClickHouse parsing issues.
Key Transformations:
Removes the
timestamp
field if bothtimestamp
and@timestamp
exist.Flattens nested structures.
Removes
@
symbols, replaces dots (.
) and dashes (-
) with underscores (_
).
Outcome: Ensures data types and field names are compatible with ClickHouse, preventing loading errors and improving query performance.
f. Subschema Mapping Transform
Enrichment Table: subschema_mapping_csv
Purpose: Assigns events to subschemas (i.e., specific ClickHouse tables) based on conditions defined in an enrichment table.
Configuration:
enrichment_tables:
subschema_mapping_csv:
type: "file"
file:
path: "${VECTOR_ENRICHMENT_PATH:?err}/${EVENT_SUBSCHEMA_CSV:?err}"
encoding:
type: "csv"
schema:
category: "string"
field: "string"
value: "string"
subschema: "string"
transforms:
118_transform_ch_subschema_mapping:
type: remap
inputs:
- ${_118_TRANSFORM_CH_SUBSCHEMA_MAPPING_INPUT:?err}
source: |
mapping_result = find_enrichment_table_records("subschema_mapping_csv", {"category": .tags_event_category}, case_sensitive: false) ?? []
for_each(mapping_result) -> |_index, data| {
value = get(., split!(data.field, ".")) ?? null
if is_nullish(data.value) {
if !is_null(value) {
. = set!(., ["tags_event_category"], data.subschema)
}
} else {
if !is_null(value) && (value == data.value) {
. = set!(., ["tags_event_category"], data.subschema)
}
}
}
Explanation:
Logic:
Loads mapping conditions from
subschema_mapping_csv
.For events matching the
category
, checks if specifiedfield
andvalue
conditions are met.Updates
tags_event_category
to thesubschema
value if conditions are satisfied.
Outcome: Events are correctly categorized into subschemas, ensuring they are routed to the appropriate ClickHouse tables.
g. Dead Letter Queue (DLQ) Routing Transform
Enrichment Table: dlq_event_category
Purpose: Routes events to a Kafka DLQ if they fail certain checks, such as missing org_id
or invalid event_category
.
Configuration:
enrichment_tables:
dlq_event_category:
type: "file"
file:
path: "${VECTOR_ENRICHMENT_PATH:?err}/${DLQ_EVENT_CATEGORY_CSV:?err}"
encoding:
type: "csv"
schema:
org_id: "string"
event_category: "string"
transforms:
159_transform_dlq_event_category_filter:
type: remap
inputs:
- "${_159_TRANSFORM_DLQ_EVENT_CATEGORY_INPUT:?err}"
source: |
dlq_event_category_org_id = get_enrichment_table_record("dlq_event_category",
{
"org_id": get!(., ["tags_event_org_id"]),
},
case_sensitive: false) ?? {}
dlq_event_category_event_category = get_enrichment_table_record("dlq_event_category",
{
"event_category": get!(., ["tags_event_category"]),
},
case_sensitive: false) ?? {}
if is_empty(dlq_event_category_org_id) || is_empty(dlq_event_category_event_category) {
csv_org_id = get(dlq_event_category_event_category, ["org_id"]) ?? null
if csv_org_id != "*" {
. = set!(., ["tags_event_is_dlq"], true)
}
}
159_transform_dlq_event_category:
inputs:
- 159_transform_dlq_event_category_filter
type: "route"
route:
route_kafka_sink:
type: "vrl"
source: exists(.tags_event_is_dlq) && exists(.tags_event_category)
route_clickhouse_sink:
type: "vrl"
source: is_nullish(get(., ["tags_event_is_dlq"]) ?? null)
Explanation:
Logic:
Checks if the
org_id
andevent_category
of an event exist in thedlq_event_category
enrichment table.If not, marks the event with
tags_event_is_dlq = true
.Uses a routing transform to send DLQ events to a Kafka sink and valid events to ClickHouse.
Outcome: Ensures only valid events are loaded into ClickHouse, while problematic events are captured for further analysis.
3. Sink Configurations
a. ClickHouse Sink
Purpose: Loads the processed data into ClickHouse tables.
Configuration:
sinks:
clickhouse_sink:
type: clickhouse
inputs:
- "${_201_SINK_CLICKHOUSE_SAAS_INPUT}"
endpoint: "https://<clickhouse-endpoint>"
database: "<database-name>"
table: "{{ tags_event_category }}"
auth:
strategy: basic
username: "<username>"
password: "<password>"
compression: gzip
batch:
max_bytes: ${_201_SINK_CLICKHOUSE_SAAS_BATCH_MAX_BYTES}
timeout_secs: ${_201_SINK_CLICKHOUSE_SAAS_BATCH_TIMEOUT_SECS}
Explanation:
Dynamic Table Routing: Uses
tags_event_category
to determine the ClickHouse table for each event.Authentication: Uses basic authentication for ClickHouse.
Batching: Configures batch size and timeout for efficient data loading.
b. Kafka DLQ Sink
Purpose: Routes events that failed validation to a Kafka DLQ topic.
Configuration:
sinks:
kafka_dlq_sink:
type: kafka
inputs:
- "${_202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT}"
bootstrap_servers: "${KAFKA_BROKERS}"
topic: "dlq_events"
tls:
enabled: true
ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
key_file: "${KAFKA_MTLS_PATH}/client.key"
crt_file: "${KAFKA_MTLS_PATH}/client.crt"
encoding:
codec: json
buffer:
type: disk
max_size: ${VECTOR_BUFFER_SIZE:-1073741824} # 1GB default
when_full: block
Explanation:
Routing: Receives events marked as DLQ from the routing transform.
Reliable Delivery: Uses disk buffering to ensure events are not lost.
Secure Communication: Configured with TLS for Kafka brokers.
Environment Variables and Configurations
The ingestion load to ClickHouse step relies on several environment variables for dynamic configuration.
Configuration Map:
VECTOR_DATA_DIR: "/vector-data-dir"
VECTOR_ENRICHMENT_PATH: "/etc/vector/vector_templates"
VECTOR_MTLS_PATH: "/etc/vector_tls"
KAFKA_MTLS_PATH: "/etc/vector_mtls"
VECTOR_BUFFER_SIZE: "23000000000"
KAFKA_SOURCE_TOPIC_REGEX: "^logs_(beats_filebeat_dns_windows|beats_filebeat|beats_winlogbeat|hypercol_internal|hypercol_metric|netflow|nxlog_windows|syslog_linux_audit|syslog_linux)_load"
KAFKA_CONSUMER_GROUP: "vector-core-load-ch"
KAFKA_BROKERS: "broker1:9094,broker2:9094,broker3:9094"
JSON_FIELDS: "/standard_enrichment_files/hypersec-enrichment-json-fields.csv"
FILTER_CSV: "/standard_enrichment_files/filter-core-load-ch.csv"
SUPPORTED_TIMESTAMP_FORMAT: '["%FT%X%.3fZ", "%FT%X%.6fZ", "%FT%X%.9fZ", "%F %X%.3f", "%F %X%.6f", "%F %X%.9f", "%FT%X%.3f", "%FT%X%.6f", "%FT%X%.9f", "%FT%XZ", "%FT%X", "%F %X", "%FT%X"]'
FIELD_SETTER_ITERATION_GROUP: "finalise"
EVENT_SUBSCHEMA_CSV: "/standard_enrichment_files/core-load-ch-event-subschema-map.csv"
DLQ_EVENT_CATEGORY_CSV: "/standard_enrichment_files/hypersec-enrichment-dlq-event-category.csv"
FILTER_ITERATION_GROUP: "iterator"
# Transform Inputs
_101_TRANSFORM_FLATTEN_MESSAGE_INPUT: "003_source_kafka_aws_saas_regex"
_138_TRANSFORM_FILTER_INPUT: "101_transform_flatten_message"
_115_TRANSFORM_TIMESTAMP_LOAD_INPUT: "138_transform_filter"
_108_TRANSFORM_CH_JSON_REMAP: "115_transform_timestamp_load"
_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT: "108_transform_ch_json_remap"
_109_TRANSFORM_CH_CUSTOM_TRANSFORMATIONS_INPUT: "116_transform_fields_to_camel_case"
_118_TRANSFORM_CH_SUBSCHEMA_MAPPING_INPUT: "109_transform_ch_custom_transformations"
_159_TRANSFORM_DLQ_EVENT_CATEGORY_INPUT: "118_transform_ch_subschema_mapping"
_201_SINK_CLICKHOUSE_SAAS_INPUT: "159_transform_dlq_event_category.route_clickhouse_sink"
_202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT: "159_transform_dlq_event_category.route_kafka_sink"
# ClickHouse Sink Configurations
_201_SINK_CLICKHOUSE_SAAS_BATCH_MAX_BYTES: "10000000"
_201_SINK_CLICKHOUSE_SAAS_BATCH_TIMEOUT_SECS: "1"
Explanation:
Paths and Directories:
VECTOR_DATA_DIR
: Directory for Vector's state and buffers.VECTOR_ENRICHMENT_PATH
: Path to enrichment tables.VECTOR_MTLS_PATH
andKAFKA_MTLS_PATH
: Paths to TLS certificates.
Kafka Configuration:
KAFKA_BROKERS
: Kafka broker addresses.KAFKA_SOURCE_TOPIC_REGEX
: Regex to match source topics.KAFKA_CONSUMER_GROUP
: Consumer group ID.
Enrichment Files:
JSON_FIELDS
: CSV file for JSON field mappings.FILTER_CSV
: CSV file for filter criteria.EVENT_SUBSCHEMA_CSV
: CSV file for subschema mappings.DLQ_EVENT_CATEGORY_CSV
: CSV file for DLQ criteria.
Transform Inputs: Defines the input for each transform, allowing for dynamic chaining.
ClickHouse Sink Configurations:
Batch size and timeout settings for efficient data loading.
Data Flow and Transform Order
The data flows through the components in the following order:
Source:
003_source_kafka_aws_saas_regex
Flatten Message Transform:
101_transform_flatten_message
Filter Transform:
138_transform_filter
Timestamp Load Transform:
115_transform_timestamp_load
JSON Remap Transform:
108_transform_ch_json_remap
Fields to Snake Case Transform:
116_transform_fields_to_camel_case
Custom Transformations:
109_transform_ch_custom_transformations
Subschema Mapping Transform:
118_transform_ch_subschema_mapping
DLQ Routing Transform:
159_transform_dlq_event_category
Valid Events: Routed to
159_transform_dlq_event_category.route_clickhouse_sink
DLQ Events: Routed to
159_transform_dlq_event_category.route_kafka_sink
Sinks:
ClickHouse Sink:
_201_SINK_CLICKHOUSE_SAAS_INPUT
Kafka DLQ Sink:
_202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT
Examples
Example 1: Event Passes Filter and Loads into ClickHouse
Scenario:
An event meets all criteria and is successfully loaded into ClickHouse.
Event Data:
{
"message": "User login successful",
"user_id": "12345",
"timestamp": "2024-03-27T12:34:56Z",
"tags": {
"event": {
"category": "user_activity",
"org_id": "org_001"
}
}
}
Process:
Filter Transform: Event passes the filter criteria.
Timestamp Load Transform: Adds
timestamp_load
field.JSON Remap Transform: Converts any specified JSON fields.
Fields to Snake Case Transform: Converts field names to
snake_case
.Custom Transformations: Converts timestamps to epoch milliseconds.
Subschema Mapping Transform: Determines the subschema based on
tags_event_category
.DLQ Routing Transform: Event passes checks and is routed to ClickHouse sink.
ClickHouse Sink: Event is loaded into the appropriate ClickHouse table.
Outcome:
Event is successfully stored in ClickHouse under the user_activity
table.
Example 2: Event Fails Filter and Is Discarded
Scenario:
An event does not meet the filter criteria and is discarded.
Event Data:
{
"message": "System error occurred",
"error_code": "500",
"timestamp": "2024-03-27T12:35:00Z",
"tags": {
"event": {
"category": "system_error",
"org_id": "org_002"
}
}
}
Process:
Filter Transform: Event matches a
never_match
condition.Action: Event is discarded (
abort
).
Outcome:
Event is not processed further and is not stored in ClickHouse or routed to DLQ.
Example 3: Event Routed to DLQ Due to Missing org_id
Scenario:
An event is missing a valid org_id
and is routed to the Kafka DLQ.
Event Data:
{
"message": "Unauthorized access attempt",
"user_id": "unknown",
"timestamp": "2024-03-27T12:36:00Z",
"tags": {
"event": {
"category": "security_alert"
// Missing "org_id"
}
}
}
Process:
Filter Transform: Event passes the filter criteria.
DLQ Routing Transform:
Checks
org_id
indlq_event_category
enrichment table.Since
org_id
is missing, event is marked withtags_event_is_dlq = true
.Event is routed to the Kafka DLQ sink.
Outcome:
Event is not loaded into ClickHouse but is sent to the DLQ for further investigation.
Example 4: Subschema Mapping Alters Event Category
Scenario:
An event's tags_event_category
is modified based on subschema mapping conditions.
Event Data:
{
"message": "File uploaded",
"user_id": "12345",
"file_type": "pdf",
"timestamp": "2024-03-27T12:37:00Z",
"tags": {
"event": {
"category": "file_operations",
"org_id": "org_003"
}
}
}
Subschema Mapping Condition:
If
tags.event.category
isfile_operations
andfile_type
ispdf
, settags_event_category
topdf_uploads
.
Process:
Subschema Mapping Transform:
Condition is met.
tags_event_category
is updated topdf_uploads
.
ClickHouse Sink:
Event is routed to the
pdf_uploads
table in ClickHouse.
Outcome:
Event is stored in the pdf_uploads
table, facilitating specialized handling or analysis for PDF uploads.
Conclusion
The ingestion load to ClickHouse step is a vital component that ensures data integrity, compliance with schema requirements, and efficient data storage in ClickHouse. By applying meticulous transformations and validations, we guarantee that only high-quality, properly formatted data is loaded into our analytical databases.
Key Takeaways:
Data Quality Assurance: Filters and validations prevent bad data from polluting ClickHouse tables.
Schema Compliance: Transformations align data with ClickHouse's requirements, avoiding loading errors.
Dynamic Routing: Subschema mapping and DLQ routing enable flexible data handling based on event content.
Operational Efficiency: Batching and proper configuration optimize data loading performance.
Flexibility and Configurability: Environment variables and enrichment tables allow for easy adjustments without code changes.
For further details or assistance, please refer to the configuration files or contact the technical team.
Last updated