Filter messages with Lua custom filter code
Configure the Confluent Consume plugin to filter messages with Lua custom filter code. In this example, the Confluent Consume plugin filters by empty values.
Prerequisites
-
You have a Kafka cluster
-
You have a Kafka topic in the cluster
Environment variables
-
BOOTSTRAP_SERVER_HOST: The bootstrap server host. -
KAFKA_TOPIC: The name of the Kafka topic to consume from. -
API_KEY: The API key to use for authentication. -
API_SECRET: The API secret to use for authentication.
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
Make the following request:
curl -i -X POST http://localhost:8001/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
labels:
global: 'true'
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == 'table' and message or { value = message }
if m.value == nil or (type(m.value)=='string' and #m.value==0) then
return nil
end
return m
end
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
plugin: confluent-consume
" | kubectl apply -f -
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "server-sent-events"
message_by_lua_functions = [<<EOF
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
EOF]
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "api_secret" {
type = string
}
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
service: serviceName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
Make sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/services/{serviceName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/services/{serviceId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
serviceId: Theidof the service the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == 'table' and message or { value = message }
if m.value == nil or (type(m.value)=='string' and #m.value==0) then
return nil
end
return m
end
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
plugin: confluent-consume
" | kubectl apply -f -
Next, apply the KongPlugin resource by annotating the service resource:
kubectl annotate -n kong service SERVICE_NAME konghq.com/plugins=confluent-consume
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "server-sent-events"
message_by_lua_functions = [<<EOF
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
EOF]
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
service = {
id = konnect_gateway_service.my_service.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "api_secret" {
type = string
}
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
route: routeName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
Make sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/routes/{routeName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/routes/{routeId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
routeId: Theidof the route the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == 'table' and message or { value = message }
if m.value == nil or (type(m.value)=='string' and #m.value==0) then
return nil
end
return m
end
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
plugin: confluent-consume
" | kubectl apply -f -
Next, apply the KongPlugin resource by annotating the httproute or ingress resource:
kubectl annotate -n kong httproute konghq.com/plugins=confluent-consume
kubectl annotate -n kong ingress konghq.com/plugins=confluent-consume
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "server-sent-events"
message_by_lua_functions = [<<EOF
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
EOF]
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
route = {
id = konnect_gateway_route.my_route.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "api_secret" {
type = string
}
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
consumer: consumerName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
Make sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/consumers/{consumerName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/consumers/{consumerId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "server-sent-events",
"message_by_lua_functions": [
"return function(message)\n local m = type(message) == \"table\" and message or { value = message }\n if m.value == nil or (type(m.value)==\"string\" and #m.value==0) then\n return nil\n end\n return m\nend\n"
],
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'"
},
"tags": []
}
'
Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
consumerId: Theidof the consumer the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: server-sent-events
message_by_lua_functions:
- |
return function(message)
local m = type(message) == 'table' and message or { value = message }
if m.value == nil or (type(m.value)=='string' and #m.value==0) then
return nil
end
return m
end
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
plugin: confluent-consume
" | kubectl apply -f -
Next, apply the KongPlugin resource by annotating the KongConsumer resource:
kubectl annotate -n kong CONSUMER_NAME konghq.com/plugins=confluent-consume
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "server-sent-events"
message_by_lua_functions = [<<EOF
return function(message)
local m = type(message) == "table" and message or { value = message }
if m.value == nil or (type(m.value)=="string" and #m.value==0) then
return nil
end
return m
end
EOF]
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
consumer = {
id = konnect_gateway_consumer.my_consumer.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "api_secret" {
type = string
}