ClickHouse Loading

Introduction

The ingestion load to ClickHouse step is a crucial part of our data pipeline, responsible for loading processed data into ClickHouse—a high-performance, open-source columnar database. This step ensures that data is correctly routed to the appropriate tables based on subschemas, handles customer organization IDs (org_id), and applies necessary transformations for efficient storage and retrieval. By meticulously processing and organizing data at this stage, we facilitate fast query performance and accurate analytics downstream.

Key Concepts

Loading Data into ClickHouse

  • Efficient Data Storage: ClickHouse is optimized for handling large volumes of data. Properly structuring data before loading maximizes performance.

  • Subschema Routing: Data is categorized into subschemas based on specific criteria, ensuring it is stored in the correct tables.

  • Organization Identification: Events are associated with org_id to segregate data by customer, maintaining data integrity and security.

  • Transformations: Necessary data transformations are applied to conform to ClickHouse's data types and schema requirements.

Components

The ingestion load to ClickHouse step comprises several components, each serving a specific function in the data processing pipeline:

  1. Source: Reads data from Kafka topics using a topic regex to match multiple topics.

  2. Transforms: A series of transformations applied to the data:

    • Filter Transform: Filters events based on specified criteria.

    • Timestamp Load Transform: Adds a timestamp indicating when the data was loaded.

    • JSON Remap Transform: Converts specified JSON fields into strings.

    • Fields to Snake Case Transform: Converts field names to snake_case.

    • Custom Transformations: Applies additional modifications to align data with ClickHouse requirements.

    • Subschema Mapping Transform: Assigns events to subschemas based on conditions.

    • Dead Letter Queue (DLQ) Routing Transform: Routes events to DLQ if they fail certain checks.

  3. Sinks: Writes the processed data to ClickHouse or routes it to Kafka DLQ:

    • ClickHouse Sink: Loads data into ClickHouse tables.

    • Kafka DLQ Sink: Routes failed events to a Kafka topic for further inspection.

Detailed Explanation

Architecture Overview

The ingestion load to ClickHouse step uses Vector—a high-performance observability data pipeline—to process data through a series of configurations defined in YAML files and environment variables. The configurations are designed to process data sequentially, ensuring data integrity and compliance with ClickHouse's schema.

Components Include:

  • Transforms:

    • 138_transform_filter

    • 115_transform_timestamp_load

    • 108_transform_ch_json_remap

    • 116_transform_fields_to_snake_case

    • 109_transform_ch_custom_transformations

    • 118_transform_ch_subschema_mapping

    • 159_transform_dlq_event_category_filter

    • 159_transform_dlq_event_category

  • Enrichment Tables:

    • filter_whitelist

    • json_field_map

    • subschema_mapping_csv

    • dlq_event_category

  • Environment Variables: Define paths, Kafka configurations, and inputs for each component.

1. Source Configuration

Purpose: Reads data from Kafka topics using a regular expression to match multiple topics.

Key Points:

  • Type: Kafka source.

  • Topics: Configured via environment variable KAFKA_SOURCE_TOPIC_REGEX, which allows matching multiple topics.

  • TLS Configuration: Ensures secure communication with Kafka brokers.

  • Consumer Group: Manages offsets and load balancing.

Example Configuration:

sources:
  kafka_source:
    type: kafka
    bootstrap_servers: "${KAFKA_BROKERS}"
    topics_regex: "${KAFKA_SOURCE_TOPIC_REGEX}"
    group_id: "${KAFKA_CONSUMER_GROUP}"
    tls:
      enabled: true
      ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
      key_file: "${KAFKA_MTLS_PATH}/client.key"
      crt_file: "${KAFKA_MTLS_PATH}/client.crt"
    encoding:
      codec: json

2. Transformation Steps

The data undergoes several transformations to prepare it for loading into ClickHouse.

a. Filter Transform

File: Not specified (Configuration provided in the components)

Purpose: Filters events based on specified criteria from an enrichment table.

Enrichment Table: filter_whitelist

Configuration:

enrichment_tables:
  filter_whitelist:
    type: "file"
    file:
      path:  "${VECTOR_ENRICHMENT_PATH:?err}/${FILTER_CSV:?err}"
      encoding:
        type: "csv"
    schema:
      iterator: "string"
      match_column: "string"
      match_value: "string"
      action: "string"

transforms:
  138_transform_filter:
    type: remap
    inputs:
      - ${_138_TRANSFORM_FILTER_INPUT:?err}
    source: |
      remove_flag = false
      always_override_flag = false
      fields = find_enrichment_table_records("filter_whitelist", {"iterator": "${FILTER_ITERATION_GROUP:?err}"}, case_sensitive: false) ?? []

      for_each(fields) -> |_index, data| {
        is_matched = false
        field_path = split(to_string!(data.match_column), ".")
        if match_array(field_path, r'-+') {
          field_path = map_values(field_path) -> |value| {
            replace(value, "-", ".")
          }
        }
        value = get(., field_path) ?? null

        if value == data.match_value {
          is_matched = true
        } else {
          if includes(values(.), data.match_value) {
            is_matched = true
          }
        }

        if (is_matched) {
          if (data.action == "never_match") {
            remove_flag = true
          } else if (data.action == "must_match") {
            remove_flag = false
          } else if (data.action == "always_override") {
            always_override_flag = true
            remove_flag = false
          }
        } else {
          if (data.action == "must_match") {
            remove_flag = true
          }
        }
      }

      if remove_flag && !always_override_flag {
        abort
      }

Explanation:

  • Logic:

    • Iterates over the filter_whitelist enrichment table.

    • Checks if event fields match specified conditions.

    • Applies actions based on the action field:

      • never_match: Discards events that match the condition.

      • must_match: Keeps events that match; discards those that don't.

      • always_override: Keeps events regardless of other conditions.

  • Outcome: Events not meeting the criteria are discarded; others proceed to the next step.

b. Timestamp Load Transform

Purpose: Adds a timestamp_load field indicating when the event was processed.

Configuration:

transforms:
  115_transform_timestamp_load:
    type: remap
    inputs:
      - ${_115_TRANSFORM_TIMESTAMP_LOAD_INPUT:?err}
    source: |
      . = set!(value:.,  path: ["timestamp_load"], data: now())

Explanation:

  • Adds a timestamp_load field with the current UTC time.

  • Helps in tracking and debugging the data flow.

c. JSON Remap Transform

Enrichment Table: json_field_map

Purpose: Converts specified JSON fields into strings for compatibility with ClickHouse's data types.

Configuration:

enrichment_tables:
  json_field_map:
    type: "file"
    file:
      path:  "${VECTOR_ENRICHMENT_PATH:?err}/${JSON_FIELDS:?err}"
      encoding:
        type: "csv"
    schema:
      iterator: "string"
      field_to_map_as_json_str: "string"

transforms:
  108_transform_ch_json_remap:
    type: remap
    inputs:
      - ${_108_TRANSFORM_CH_JSON_REMAP}
    source: |
      .tags_event_org_id = .tags.event.org_id
      .tags_event_category = .tags.event.category
      columns = find_enrichment_table_records("json_field_map", {"iterator": "finalise"}, case_sensitive: false) ?? []
      for_each(columns) -> |_index, column| {
        field_path = split!(column.field_to_map_as_json_str, ".")
        value = get(., field_path) ?? null

        if !is_nullish(value) {
          renamed_field = string!(column.field_to_map_as_json_str) + "_str"
          . = set!(value: ., path: [renamed_field], data: to_string(encode_json(value)))
          . = remove!(value: ., path: field_path)
        }
      }

Explanation:

  • Logic:

    • Reads the fields to convert from the json_field_map enrichment table.

    • For each field:

      • Converts the JSON field into a JSON string.

      • Stores it in a new field with a _str suffix.

      • Removes the original JSON field.

  • Outcome: Ensures that complex JSON structures are stored as strings in ClickHouse, avoiding schema conflicts.

d. Fields to Snake Case Transform

Purpose: Converts field names from camelCase or other formats to snake_case to comply with ClickHouse's naming conventions.

Configuration:

transforms:
  116_transform_fields_to_camel_case:
    type: remap
    inputs:
      - ${_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT:?err}
    source: |
      . = map_keys(., recursive: true) -> |key| {
        key = replace(key, r'\B(?P<char>[A-Z]+)', "_$$char")
        key = replace(key, "__", "_")
        downcase(key)
      }

Explanation:

  • Logic:

    • Recursively iterates over all keys in the event.

    • Inserts an underscore before uppercase letters (e.g., CamelCase to Camel_Case).

    • Replaces double underscores with single underscores.

    • Converts all keys to lowercase.

  • Outcome: Field names are standardized to snake_case, ensuring consistency and compliance with ClickHouse schema.

e. Custom Transformations

Purpose: Applies additional modifications to align events with ClickHouse's data types and schema.

Configuration:

transforms:
  109_transform_ch_custom_transformations:
    type: remap
    inputs:
      - ${_109_TRANSFORM_CH_CUSTOM_TRANSFORMATIONS_INPUT:?err}
    source: |
      . = map_values(., recursive: true) -> |value| {
        if is_timestamp(value) {
          value = to_unix_timestamp!(value, unit: "milliseconds")
        } else if match(to_string(value) ?? "", r'\d{4}-\d{2}-\d{2}+') {
          for_each($SUPPORTED_TIMESTAMP_FORMAT) -> |_index, format| {
            value = to_unix_timestamp(
              parse_timestamp(value, format) ??
              parse_timestamp(value, format + "%:z") ??
              parse_timestamp(value, format + "%::z") ??
              null, unit: "milliseconds") ?? value
          }
        } else if contains(to_string(value) ?? "", "$") {
          value = replace(to_string!(value), "$", "$$")
        }
        value
      }

      if (exists(.timestamp) && exists(.@timestamp)) { del(.timestamp) }

      . = map_keys(flatten(.), recursive: true) -> |key| {
        key = replace(replace(key, "@", ""), ".", "_")
        replace(key, "-", "_")
      }

Explanation:

  • Value Transformations:

    • Converts timestamp fields to Unix epoch milliseconds.

    • Parses string representations of dates into Unix epoch milliseconds.

    • Escapes dollar signs ($) to prevent ClickHouse parsing issues.

  • Key Transformations:

    • Removes the timestamp field if both timestamp and @timestamp exist.

    • Flattens nested structures.

    • Removes @ symbols, replaces dots (.) and dashes (-) with underscores (_).

  • Outcome: Ensures data types and field names are compatible with ClickHouse, preventing loading errors and improving query performance.

f. Subschema Mapping Transform

Enrichment Table: subschema_mapping_csv

Purpose: Assigns events to subschemas (i.e., specific ClickHouse tables) based on conditions defined in an enrichment table.

Configuration:

enrichment_tables:
  subschema_mapping_csv:
    type: "file"
    file:
      path:  "${VECTOR_ENRICHMENT_PATH:?err}/${EVENT_SUBSCHEMA_CSV:?err}"
      encoding:
        type: "csv"
    schema:
      category: "string"
      field: "string"
      value: "string"
      subschema: "string"

transforms:
  118_transform_ch_subschema_mapping:
    type: remap
    inputs:
      - ${_118_TRANSFORM_CH_SUBSCHEMA_MAPPING_INPUT:?err}
    source: |
      mapping_result = find_enrichment_table_records("subschema_mapping_csv", {"category": .tags_event_category}, case_sensitive: false) ?? []
      for_each(mapping_result) -> |_index, data| {
        value = get(., split!(data.field, ".")) ?? null
        if is_nullish(data.value) {
          if !is_null(value) {
            . = set!(., ["tags_event_category"], data.subschema)
          }
        } else {
          if !is_null(value) && (value == data.value) {
            . = set!(., ["tags_event_category"], data.subschema)
          }
        }
      }

Explanation:

  • Logic:

    • Loads mapping conditions from subschema_mapping_csv.

    • For events matching the category, checks if specified field and value conditions are met.

    • Updates tags_event_category to the subschema value if conditions are satisfied.

  • Outcome: Events are correctly categorized into subschemas, ensuring they are routed to the appropriate ClickHouse tables.

g. Dead Letter Queue (DLQ) Routing Transform

Enrichment Table: dlq_event_category

Purpose: Routes events to a Kafka DLQ if they fail certain checks, such as missing org_id or invalid event_category.

Configuration:

enrichment_tables:
  dlq_event_category:
    type: "file"
    file:
      path:  "${VECTOR_ENRICHMENT_PATH:?err}/${DLQ_EVENT_CATEGORY_CSV:?err}"
      encoding:
        type: "csv"
    schema:
      org_id: "string"
      event_category: "string"

transforms:
  159_transform_dlq_event_category_filter:
    type: remap
    inputs:
      - "${_159_TRANSFORM_DLQ_EVENT_CATEGORY_INPUT:?err}"
    source: |
      dlq_event_category_org_id = get_enrichment_table_record("dlq_event_category",
        {
          "org_id": get!(., ["tags_event_org_id"]),
        },
        case_sensitive: false) ?? {}

      dlq_event_category_event_category = get_enrichment_table_record("dlq_event_category",
        {
          "event_category": get!(., ["tags_event_category"]),
        },
        case_sensitive: false) ?? {}

      if is_empty(dlq_event_category_org_id) || is_empty(dlq_event_category_event_category) {
        csv_org_id = get(dlq_event_category_event_category, ["org_id"]) ?? null
        if csv_org_id != "*" {
          . = set!(., ["tags_event_is_dlq"], true)
        }
      }

  159_transform_dlq_event_category:
    inputs:
      - 159_transform_dlq_event_category_filter
    type: "route"
    route:
      route_kafka_sink:
        type: "vrl"
        source: exists(.tags_event_is_dlq) && exists(.tags_event_category)
      route_clickhouse_sink:
        type: "vrl"
        source: is_nullish(get(., ["tags_event_is_dlq"]) ?? null)

Explanation:

  • Logic:

    • Checks if the org_id and event_category of an event exist in the dlq_event_category enrichment table.

    • If not, marks the event with tags_event_is_dlq = true.

    • Uses a routing transform to send DLQ events to a Kafka sink and valid events to ClickHouse.

  • Outcome: Ensures only valid events are loaded into ClickHouse, while problematic events are captured for further analysis.

3. Sink Configurations

a. ClickHouse Sink

Purpose: Loads the processed data into ClickHouse tables.

Configuration:

sinks:
  clickhouse_sink:
    type: clickhouse
    inputs:
      - "${_201_SINK_CLICKHOUSE_SAAS_INPUT}"
    endpoint: "https://<clickhouse-endpoint>"
    database: "<database-name>"
    table: "{{ tags_event_category }}"
    auth:
      strategy: basic
      username: "<username>"
      password: "<password>"
    compression: gzip
    batch:
      max_bytes: ${_201_SINK_CLICKHOUSE_SAAS_BATCH_MAX_BYTES}
      timeout_secs: ${_201_SINK_CLICKHOUSE_SAAS_BATCH_TIMEOUT_SECS}

Explanation:

  • Dynamic Table Routing: Uses tags_event_category to determine the ClickHouse table for each event.

  • Authentication: Uses basic authentication for ClickHouse.

  • Batching: Configures batch size and timeout for efficient data loading.

b. Kafka DLQ Sink

Purpose: Routes events that failed validation to a Kafka DLQ topic.

Configuration:

sinks:
  kafka_dlq_sink:
    type: kafka
    inputs:
      - "${_202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT}"
    bootstrap_servers: "${KAFKA_BROKERS}"
    topic: "dlq_events"
    tls:
      enabled: true
      ca_file: "${KAFKA_MTLS_PATH}/ca.crt"
      key_file: "${KAFKA_MTLS_PATH}/client.key"
      crt_file: "${KAFKA_MTLS_PATH}/client.crt"
    encoding:
      codec: json
    buffer:
      type: disk
      max_size: ${VECTOR_BUFFER_SIZE:-1073741824} # 1GB default
      when_full: block

Explanation:

  • Routing: Receives events marked as DLQ from the routing transform.

  • Reliable Delivery: Uses disk buffering to ensure events are not lost.

  • Secure Communication: Configured with TLS for Kafka brokers.

Environment Variables and Configurations

The ingestion load to ClickHouse step relies on several environment variables for dynamic configuration.

Configuration Map:

VECTOR_DATA_DIR: "/vector-data-dir"
VECTOR_ENRICHMENT_PATH: "/etc/vector/vector_templates"
VECTOR_MTLS_PATH: "/etc/vector_tls"
KAFKA_MTLS_PATH: "/etc/vector_mtls"
VECTOR_BUFFER_SIZE: "23000000000"
KAFKA_SOURCE_TOPIC_REGEX: "^logs_(beats_filebeat_dns_windows|beats_filebeat|beats_winlogbeat|hypercol_internal|hypercol_metric|netflow|nxlog_windows|syslog_linux_audit|syslog_linux)_load"
KAFKA_CONSUMER_GROUP: "vector-core-load-ch"
KAFKA_BROKERS: "broker1:9094,broker2:9094,broker3:9094"
JSON_FIELDS: "/standard_enrichment_files/hypersec-enrichment-json-fields.csv"
FILTER_CSV: "/standard_enrichment_files/filter-core-load-ch.csv"
SUPPORTED_TIMESTAMP_FORMAT: '["%FT%X%.3fZ", "%FT%X%.6fZ", "%FT%X%.9fZ", "%F %X%.3f", "%F %X%.6f", "%F %X%.9f", "%FT%X%.3f", "%FT%X%.6f", "%FT%X%.9f", "%FT%XZ", "%FT%X", "%F %X", "%FT%X"]'
FIELD_SETTER_ITERATION_GROUP: "finalise"
EVENT_SUBSCHEMA_CSV: "/standard_enrichment_files/core-load-ch-event-subschema-map.csv"
DLQ_EVENT_CATEGORY_CSV: "/standard_enrichment_files/hypersec-enrichment-dlq-event-category.csv"
FILTER_ITERATION_GROUP: "iterator"

# Transform Inputs
_101_TRANSFORM_FLATTEN_MESSAGE_INPUT: "003_source_kafka_aws_saas_regex"
_138_TRANSFORM_FILTER_INPUT: "101_transform_flatten_message"
_115_TRANSFORM_TIMESTAMP_LOAD_INPUT: "138_transform_filter"
_108_TRANSFORM_CH_JSON_REMAP: "115_transform_timestamp_load"
_116_TRANSFORM_FIELDS_TO_CAMEL_CASE_INPUT: "108_transform_ch_json_remap"
_109_TRANSFORM_CH_CUSTOM_TRANSFORMATIONS_INPUT: "116_transform_fields_to_camel_case"
_118_TRANSFORM_CH_SUBSCHEMA_MAPPING_INPUT: "109_transform_ch_custom_transformations"
_159_TRANSFORM_DLQ_EVENT_CATEGORY_INPUT: "118_transform_ch_subschema_mapping"
_201_SINK_CLICKHOUSE_SAAS_INPUT: "159_transform_dlq_event_category.route_clickhouse_sink"
_202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT: "159_transform_dlq_event_category.route_kafka_sink"

# ClickHouse Sink Configurations
_201_SINK_CLICKHOUSE_SAAS_BATCH_MAX_BYTES: "10000000"
_201_SINK_CLICKHOUSE_SAAS_BATCH_TIMEOUT_SECS: "1"

Explanation:

  • Paths and Directories:

    • VECTOR_DATA_DIR: Directory for Vector's state and buffers.

    • VECTOR_ENRICHMENT_PATH: Path to enrichment tables.

    • VECTOR_MTLS_PATH and KAFKA_MTLS_PATH: Paths to TLS certificates.

  • Kafka Configuration:

    • KAFKA_BROKERS: Kafka broker addresses.

    • KAFKA_SOURCE_TOPIC_REGEX: Regex to match source topics.

    • KAFKA_CONSUMER_GROUP: Consumer group ID.

  • Enrichment Files:

    • JSON_FIELDS: CSV file for JSON field mappings.

    • FILTER_CSV: CSV file for filter criteria.

    • EVENT_SUBSCHEMA_CSV: CSV file for subschema mappings.

    • DLQ_EVENT_CATEGORY_CSV: CSV file for DLQ criteria.

  • Transform Inputs: Defines the input for each transform, allowing for dynamic chaining.

  • ClickHouse Sink Configurations:

    • Batch size and timeout settings for efficient data loading.

Data Flow and Transform Order

The data flows through the components in the following order:

  1. Source: 003_source_kafka_aws_saas_regex

  2. Flatten Message Transform: 101_transform_flatten_message

  3. Filter Transform: 138_transform_filter

  4. Timestamp Load Transform: 115_transform_timestamp_load

  5. JSON Remap Transform: 108_transform_ch_json_remap

  6. Fields to Snake Case Transform: 116_transform_fields_to_camel_case

  7. Custom Transformations: 109_transform_ch_custom_transformations

  8. Subschema Mapping Transform: 118_transform_ch_subschema_mapping

  9. DLQ Routing Transform: 159_transform_dlq_event_category

    • Valid Events: Routed to 159_transform_dlq_event_category.route_clickhouse_sink

    • DLQ Events: Routed to 159_transform_dlq_event_category.route_kafka_sink

  10. Sinks:

    • ClickHouse Sink: _201_SINK_CLICKHOUSE_SAAS_INPUT

    • Kafka DLQ Sink: _202_SINK_DLQ_KAFKA_AWS_SAAS_INPUT

Examples

Example 1: Event Passes Filter and Loads into ClickHouse

Scenario:

An event meets all criteria and is successfully loaded into ClickHouse.

Event Data:

{
  "message": "User login successful",
  "user_id": "12345",
  "timestamp": "2024-03-27T12:34:56Z",
  "tags": {
    "event": {
      "category": "user_activity",
      "org_id": "org_001"
    }
  }
}

Process:

  • Filter Transform: Event passes the filter criteria.

  • Timestamp Load Transform: Adds timestamp_load field.

  • JSON Remap Transform: Converts any specified JSON fields.

  • Fields to Snake Case Transform: Converts field names to snake_case.

  • Custom Transformations: Converts timestamps to epoch milliseconds.

  • Subschema Mapping Transform: Determines the subschema based on tags_event_category.

  • DLQ Routing Transform: Event passes checks and is routed to ClickHouse sink.

  • ClickHouse Sink: Event is loaded into the appropriate ClickHouse table.

Outcome:

Event is successfully stored in ClickHouse under the user_activity table.

Example 2: Event Fails Filter and Is Discarded

Scenario:

An event does not meet the filter criteria and is discarded.

Event Data:

{
  "message": "System error occurred",
  "error_code": "500",
  "timestamp": "2024-03-27T12:35:00Z",
  "tags": {
    "event": {
      "category": "system_error",
      "org_id": "org_002"
    }
  }
}

Process:

  • Filter Transform: Event matches a never_match condition.

  • Action: Event is discarded (abort).

Outcome:

Event is not processed further and is not stored in ClickHouse or routed to DLQ.

Example 3: Event Routed to DLQ Due to Missing org_id

Scenario:

An event is missing a valid org_id and is routed to the Kafka DLQ.

Event Data:

{
  "message": "Unauthorized access attempt",
  "user_id": "unknown",
  "timestamp": "2024-03-27T12:36:00Z",
  "tags": {
    "event": {
      "category": "security_alert"
      // Missing "org_id"
    }
  }
}

Process:

  • Filter Transform: Event passes the filter criteria.

  • DLQ Routing Transform:

    • Checks org_id in dlq_event_category enrichment table.

    • Since org_id is missing, event is marked with tags_event_is_dlq = true.

    • Event is routed to the Kafka DLQ sink.

Outcome:

Event is not loaded into ClickHouse but is sent to the DLQ for further investigation.

Example 4: Subschema Mapping Alters Event Category

Scenario:

An event's tags_event_category is modified based on subschema mapping conditions.

Event Data:

{
  "message": "File uploaded",
  "user_id": "12345",
  "file_type": "pdf",
  "timestamp": "2024-03-27T12:37:00Z",
  "tags": {
    "event": {
      "category": "file_operations",
      "org_id": "org_003"
    }
  }
}

Subschema Mapping Condition:

  • If tags.event.category is file_operations and file_type is pdf, set tags_event_category to pdf_uploads.

Process:

  • Subschema Mapping Transform:

    • Condition is met.

    • tags_event_category is updated to pdf_uploads.

  • ClickHouse Sink:

    • Event is routed to the pdf_uploads table in ClickHouse.

Outcome:

Event is stored in the pdf_uploads table, facilitating specialized handling or analysis for PDF uploads.

Conclusion

The ingestion load to ClickHouse step is a vital component that ensures data integrity, compliance with schema requirements, and efficient data storage in ClickHouse. By applying meticulous transformations and validations, we guarantee that only high-quality, properly formatted data is loaded into our analytical databases.

Key Takeaways:

  • Data Quality Assurance: Filters and validations prevent bad data from polluting ClickHouse tables.

  • Schema Compliance: Transformations align data with ClickHouse's requirements, avoiding loading errors.

  • Dynamic Routing: Subschema mapping and DLQ routing enable flexible data handling based on event content.

  • Operational Efficiency: Batching and proper configuration optimize data loading performance.

  • Flexibility and Configurability: Environment variables and enrichment tables allow for easy adjustments without code changes.

For further details or assistance, please refer to the configuration files or contact the technical team.

Last updated