Confluent Consume

Enterprise only Premium Partner
Related Documentation
Made by
Kong Inc.
Supported Gateway Topologies
hybrid db-less traditional
Supported Konnect Deployments
hybrid cloud-gateways serverless
Compatible Protocols
grpc grpcs http https ws wss
Minimum Version
Kong Gateway - 3.10

This plugin consumes messages from Confluent Cloud Kafka topics and makes them available through HTTP endpoints. For more information, see the Confluent Cloud documentation.

Note: This plugin has the following known limitations:

  • Message compression is not supported.
  • The message format is not customizable.
  • Kong Gateway does not support Kafka 4.0.

Kong also provides a plugin for publishing messages to Confluent Cloud.

Implementation details

The plugin supports the following modes of operation:

  • http-get: Consume messages via HTTP GET requests (default)
  • server-sent-events: Stream messages using server-sent events
  • websocket v3.12+: Streams messages over a WebSocket connection

WebSocket mode v3.12+

In websocket mode, the plugin maintains a bi-directional WebSocket connection with the client, allowing for continuous delivery of Confluent Cloud messages to the client.

Here’s how it works:

  1. Establish a WebSocket connection to the Route where the Confluent Consume plugin is enabled and mode is set to websocket.
  2. Kong Gateway continuously streams messages as JSON text frames.
  3. Optionally, client sends acknowledgments (client-acks) for each message or batch to enable at-least-once delivery semantics.

This approach provides real-time message flow without the limitations of HTTP polling.

 
sequenceDiagram
    participant Client
    participant Kong as Kong Gateway
    participant Broker as Message Broker

    Client->>Kong: Establish WebSocket connection
    Kong->>Broker: Connect to broker

    loop Continuous message delivery
        Broker->>Kong: Broker message
        Kong->>Client: Stream JSON text frame

        opt client-acks
            Client->>Kong: Acknowledge message/batch
        end
    end

  

Figure 1: The diagram shows the bi-directional WebSocket flow where the Confluent Consume plugin is running in websocket mode, and messages are streamed as JSON text frames.

This mode provides parity with HTTP-based consumption, including support for:

  • Message keys
  • Topic filtering
  • Confluent Cloud authentication and TLS
  • Auto or manual offset commits

Message delivery guarantees

When running multiple data plane nodes, there is no thread-safe behavior between nodes. In high-load scenarios, you may observe the same message being delivered multiple times across different data plane nodes.

To minimize duplicate message delivery in a multi-node setup, consider:

  • Using a single data plane node for consuming messages from specific topics
  • Implementing idempotency handling in your consuming application
  • Monitoring Consumer Group offsets across your data plane nodes

Schema registry support v3.11+

The Confluent Consume plugin supports integration with Confluent Schema Registry for AVRO and JSON schemas.

Schema registries provide a centralized repository for managing and validating schemas for data formats like AVRO and JSON. Integrating with a schema registry allows the plugin to validate and serialize/deserialize messages in a standardized format.

Using a schema registry with Kong Gateway provides several benefits:

  • Data validation: Ensures messages conform to a predefined schema before being processed.
  • Schema evolution: Manages schema changes and versioning.
  • Interoperability: Enables seamless communication between different services using standardized data formats.
  • Reduced overhead: Minimizes the need for custom validation logic in your applications.

To learn more about Kong’s supported schema registry, see:

How schema registry validation works

When a consumer plugin is configured with a schema registry, the following workflow occurs:

 
sequenceDiagram
autonumber
    participant Kafka
    participant Kong as Confluent Consume plugin
    participant Registry as Schema Registry
    participant Client
    
    activate Kafka
    activate Kong
    Kafka->>Kong: Send message
    deactivate Kafka
    Kong->>Kong: Extract schema ID
    activate Registry
    Kong->>Registry: Fetch schema from registry
    Registry-->>Kong: Return schema
    deactivate Registry
    Kong->>Kong: Deserialize using schema
    activate Client
    Kong->>Client: Return response to client
    deactivate Kong
    deactivate Client
  

Configure schema registry

To configure Schema Registry with the Confluent Consume plugin, use the config.schema_registry parameter in your plugin configuration.

For sample configuration values, see:

Filter and transform messages v3.12+

You can use the config.message_by_lua_functions parameter to specify custom Lua code that will filter or transform Kafka messages.

For examples, see the following:

Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!