Filter Kafka records by classification headers

TL;DR
  1. Create a Schema Validation policy (produce phase) to parse JSON records.
  2. Nest a Modify Headers policy to classify records with a header based on content.
  3. Create a Skip Records policy (consume phase) to filter records based on the header and principal name.

Prerequisites

Install kafkactl. You’ll need it to interact with Kafka clusters.

Let’s define a context we can use to create Kafka topics:

cat <<EOF > kafkactl.yaml
contexts:
    direct:
      brokers:
        - localhost:9095
        - localhost:9096
        - localhost:9094
EOF

Start a Docker Compose cluster with multiple Kafka services.

First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:

cat <<EOF > docker-compose.yaml
name: kafka_cluster

networks:
  kafka:
    name: kafka_event_gateway

services:
  kafka1:
    image: apache/kafka:4.1.1
    networks:
      - kafka
    container_name: kafka1
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka2:
    image: apache/kafka:4.1.1
    networks:
      - kafka
    container_name: kafka2
    ports:
      - "9095:9095"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka3:
    image: apache/kafka:4.1.1
    networks:
      - kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
EOF

Now, let’s start the local setup:

docker compose up -d

If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.

  1. The following Konnect items are required to complete this tutorial:
    • Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
  2. Set the personal access token as an environment variable:

    export KONNECT_TOKEN='YOUR KONNECT TOKEN'
    

Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:

curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway

This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:

export EVENT_GATEWAY_ID=your-gateway-id

Copy and paste the command with your Event Gateway ID into your terminal to configure your session.

This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.

Overview

In this guide, you’ll learn how to classify Kafka records at produce time and filter them at consume time based on user identity.

We’ll use a logging scenario where an app_logs topic contains log entries with different severity levels, aimed at two different groups of users:

  • The SRE team needs debug and trace logs, which are verbose and useful for troubleshooting issues.
  • Regular developers only need info, warn, and error logs.

The approach uses two policies:

  1. Produce phase: A Schema Validation policy parses JSON records, and a nested Modify Headers policy adds an x-internal: true header to debug and trace logs.
  2. Consume phase: A Skip Records policy filters out internal logs for users who aren’t on the SRE team.

Here’s how the data flows through the system:

 
flowchart LR
    P[Producer] --> SV

    subgraph produce [Event Gateway Produce policy chain]
        SV[Schema 
Validation
Parse JSON] --> MH{Modify Headers
level
= debug/trace?} MH -->|Yes| H1[Add
x-internal: true] MH -->|No| H2[No header
added] end subgraph consume [Event Gateway Consume policy chain] SR{Skip Records
x-internal = true
AND
user ≠ sre_user?} SR -->|Yes| DROP[Record
skipped] SR -->|No| C[Send to
consumer] end H1 --> K[Kafka
Broker] H2 --> K K --> SR C --> CO[Consumer]

Performance tip: Classifying records at produce time is more efficient than at consume time. Parsing JSON once during production avoids repeated deserialization for each consumer group.

Create a Kafka topic

Create an app_logs topic in the Kafka cluster:

kafkactl -C kafkactl.yaml --context direct create topic app_logs

Create a backend cluster

Use the following command to create a backend cluster that connects to the Kafka servers you set up:

BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "backend_cluster",
       "bootstrap_servers": [
         "kafka1:9092",
         "kafka2:9092",
         "kafka3:9092"
       ],
       "authentication": {
         "type": "anonymous"
       },
       "tls": {
         "enabled": false
       }
     }' | jq -r ".id"
)

Create a virtual cluster

Create a virtual cluster with two users (principals):

  • sre_user: Can see all logs including debug and trace
  • dev_user: Only sees info, warn, and error logs
VIRTUAL_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "logs_vc",
       "destination": {
         "id": "'$BACKEND_CLUSTER_ID'"
       },
       "dns_label": "logs",
       "authentication": [
         {
           "type": "sasl_plain",
           "mediation": "terminate",
           "principals": [
             {
               "username": "sre_user",
               "password": "sre_password"
             },
             {
               "username": "dev_user",
               "password": "dev_password"
             }
           ]
         }
       ],
       "acl_mode": "passthrough"
     }' | jq -r ".id"
)

Create a listener with a forwarding policy

Create a listener to accept connections:

LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "logs_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19092-19095"
       ]
     }' | jq -r ".id"
)

Create a port mapping policy to forward traffic to the virtual cluster:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward_to_logs_vc",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$VIRTUAL_CLUSTER_ID'"
         }
       }
     }'

Create a Schema Validation policy

Create a Schema Validation policy that parses JSON records during the produce phase. This allows nested policies to access record content:

SCHEMA_VALIDATION_POLICY_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "schema_validation",
       "name": "parse_json_logs",
       "config": {
         "type": "json",
         "value_validation_action": "reject"
       }
     }' | jq -r ".id"
)

The value_validation_action: reject setting ensures data quality: if a producer sends a record that isn’t valid JSON, the entire batch containing that record is rejected and the producer receives an error.

Create a Modify Headers policy to classify logs

Create a Modify Headers policy nested under the Schema Validation policy. This policy adds an x-internal: true header when the log level is debug or trace:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "modify_headers",
       "name": "classify_internal_logs",
       "parent_policy_id": "'$SCHEMA_VALIDATION_POLICY_ID'",
       "condition": "record.value.content[\"level\"] == \"debug\" || record.value.content[\"level\"] == \"trace\"",
       "config": {
         "actions": [
           {
             "op": "set",
             "key": "x-internal",
             "value": "true"
           }
         ]
       }
     }'

Create a Skip Records policy to filter logs

Create a Skip Records policy that filters out internal logs for non-SRE users during the consume phase:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "skip_record",
       "name": "filter_internal_logs",
       "condition": "record.headers[\"x-internal\"] == \"true\" && context.auth.principal.name != \"sre_user\""
     }'

Configure kafkactl

Create a kafkactl configuration with contexts for both users:

cat <<EOF > logs-cluster.yaml
contexts:
  sre:
    brokers:
      - localhost:19092
    sasl:
      enabled: true
      username: sre_user
      password: sre_password
  dev:
    brokers:
      - localhost:19092
    sasl:
      enabled: true
      username: dev_user
      password: dev_password
EOF

Validate

Now, let’s validate that you can produce and consume log records as the two different user profiles.

Produce log records

Produce log records with different severity levels:

echo '{"level": "info", "message": "Application started"}
{"level": "debug", "message": "Loading configuration from /etc/app/config.yaml"}
{"level": "error", "message": "Failed to connect to database"}
{"level": "trace", "message": "Entering function processRequest()"}
{"level": "warn", "message": "High memory usage detected"}' | kafkactl -C logs-cluster.yaml --context sre produce app_logs

We’ve produced 5 log records:

  • 2 internal logs (debug, trace) - will be classified with x-internal: true
  • 3 regular logs (info, error, warn) - no classification header

Consume as SRE user

Consume logs as the SRE user. You should see all 5 records:

kafkactl -C logs-cluster.yaml --context sre consume app_logs --from-beginning --exit --print-headers

The output includes all logs, with x-internal:true header on debug and trace entries:

#{"level": "info", "message": "Application started"}
x-internal:true#{"level": "debug", "message": "Loading configuration from /etc/app/config.yaml"}
#{"level": "error", "message": "Failed to connect to database"}
x-internal:true#{"level": "trace", "message": "Entering function processRequest()"}
#{"level": "warn", "message": "High memory usage detected"}

Consume as developer user

Consume logs as the developer user. You should only see 3 records (debug and trace logs are filtered out):

kafkactl -C logs-cluster.yaml --context dev consume app_logs --from-beginning --exit --print-headers

The output excludes debug and trace logs:

#{"level": "info", "message": "Application started"}
#{"level": "error", "message": "Failed to connect to database"}
#{"level": "warn", "message": "High memory usage detected"}

The developer user only sees the logs relevant to their work, while the verbose debug and trace logs are automatically filtered out.

Cleanup

When you’re done experimenting with this example, clean up the resources:

  1. If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.

  2. Stop and remove the containers:

    docker-compose down
    

This will stop all services and remove the containers, but preserve your configuration files for future use.

Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!