Publish request and response logs to an Apache Kafka topic. This plugin does not support message compression. For more information, see Kafka topics.
Kong also provides a Kafka plugin for request transformations. See Kafka Upstream.
Publish request and response logs to an Apache Kafka topic. This plugin does not support message compression. For more information, see Kafka topics.
Kong also provides a Kafka plugin for request transformations. See Kafka Upstream.
Note: If the
max_batch_sizeargument > 1, a request is logged as an array of JSON objects.
Every request is logged separately in a JSON object, separated by a new line \n.
Expand this block to see a sample log object
{ "response": { "size": 9982, "headers": { "access-control-allow-origin": "*", "content-length": "9593", "date": "Thu, 19 Sep 2024 22:10:39 GMT", "content-type": "text/html; charset=utf-8", "via": "1.1 kong/3.8.0.0-enterprise-edition", "connection": "close", "server": "gunicorn/19.9.0", "access-control-allow-credentials": "true", "x-kong-upstream-latency": "171", "x-kong-proxy-latency": "1", "x-kong-request-id": "2f6946328ffc4946b8c9120704a4a155" }, "status": 200 }, "route": { "updated_at": 1726782477, "tags": [], "response_buffering": true, "path_handling": "v0", "protocols": [ "http", "https" ], "service": { "id": "fb4eecf8-dec2-40ef-b779-16de7e2384c7" }, "https_redirect_status_code": 426, "regex_priority": 0, "name": "example_route", "id": "0f1a4101-3327-4274-b1e4-484a4ab0c030", "strip_path": true, "preserve_host": false, "created_at": 1726782477, "request_buffering": true, "ws_id": "f381e34e-5c25-4e65-b91b-3c0a86cfc393", "paths": [ "/example-route" ] }, "workspace": "f381e34e-5c25-4e65-b91b-3c0a86cfc393", "workspace_name": "default", "tries": [ { "balancer_start": 1726783839539, "balancer_start_ns": 1.7267838395395e+18, "ip": "34.237.204.224", "balancer_latency": 0, "port": 80, "balancer_latency_ns": 27904 } ], "client_ip": "192.168.65.1", "request": { "id": "2f6946328ffc4946b8c9120704a4a155", "headers": { "accept": "*/*", "user-agent": "HTTPie/3.2.3", "host": "localhost:8000", "connection": "keep-alive", "accept-encoding": "gzip, deflate" }, "uri": "/example-route", "size": 139, "method": "GET", "querystring": {}, "url": "http://localhost:8000/example-route" }, "upstream_uri": "/", "started_at": 1726783839538, "source": "upstream", "upstream_status": "200", "latencies": { "kong": 1, "proxy": 171, "request": 173, "receive": 1 }, "service": { "write_timeout": 60000, "read_timeout": 60000, "updated_at": 1726782459, "host": "httpbin.konghq.com", "name": "example_service", "id": "fb4eecf8-dec2-40ef-b779-16de7e2384c7", "port": 80, "enabled": true, "created_at": 1726782459, "protocol": "http", "ws_id": "f381e34e-5c25-4e65-b91b-3c0a86cfc393", "connect_timeout": 60000, "retries": 5 } }
This plugin uses the lua-resty-kafka client.
When encoding request bodies, several things happen:
application/x-www-form-urlencoded, multipart/form-data,
or application/json, this plugin passes the raw request body in the body attribute, and tries
to return a parsed version of those arguments in body_args.
If this parsing fails, the plugin returns an error message and the message isn’t sent.content-type is not text/plain, text/html, application/xml, text/xml, or application/soap+xml,
then the body will be base64-encoded to ensure that the message can be sent as JSON. In that case,
the message has an extra attribute called body_base64 set to true.The custom_fields_by_lua configuration allows for the dynamic modification of
log fields using Lua code. Below is a snippet of an example configuration that
removes the route field from the logs:
curl -i -X POST http://localhost:8001/plugins \
--data config.name=kafka-log \
--data config.custom_fields_by_lua.route="return nil"
Similarly, new fields can be added:
curl -i -X POST http://localhost:8001/plugins \
--data config.name=kafka-log \
--data config.custom_fields_by_lua.header="return kong.request.get_header('h1')"
Dot characters (.) in the field key create nested fields. You can use a backslash \ to escape a dot if you want to keep it in the field name.
For example, if you configure a field with both a regular dot and an escaped dot:
curl -i -X POST http://localhost:8001/plugins/ \
...
--data config.name=Kafka Log \
--data config.custom_fields_by_lua.[my_entry.log\.field]="return foo"
The field will look like this in the log:
"my_entry": {
"log.field": "foo"
}
All logging plugins use the same table for logging.
If you set custom_fields_by_lua in one plugin, all logging plugins that execute after that plugin will also use the same configuration.
For example, if you configure fields via custom_fields_by_lua in Kafka Log, those same fields will appear in Syslog, since Kafka Log executes first.
If you want all logging plugins to use the same configuration, we recommend using the Pre-function plugin to call kong.log.set_serialize_value so that the function is applied predictably and is easier to manage.
If you don’t want all logging plugins to use the same configuration, you need to manually disable the relevant fields in each plugin.
For example, if you configure a field in File Log that you don’t want appearing in Kafka Log, set that field to return nil in the Kafka Log plugin:
curl -i -X POST http://localhost:8001/plugins/ \
...
--data config.name=kafka-log \
--data config.custom_fields_by_lua.my_file_log_field="return nil"
See the plugin execution order reference for more details on plugin ordering.
Lua code runs in a restricted sandbox environment, whose behavior is governed
by the untrusted_lua configuration properties.
Sandboxing consists of several limitations in the way the Lua code can be executed, for heightened security.
The following functions are not available because they can be used to abuse the system:
string.rep: Can be used to allocate millions of bytes in one operation.{set|get}metatable: Can be used to modify the metatables of global objects (strings, numbers).collectgarbage: Can be abused to kill the performance of other workers._G: Is the root node which has access to all functions. It is masked by a temporary table.load{file|string}: Is deemed unsafe because it can grant access to the global environment.raw{get|set|equal}: Potentially unsafe because sandboxing relies on some metatable manipulation.string.dump: Can display confidential server information (such as implementation of functions).math.randomseed: Can affect the host system. Kong Gateway already seeds the random number generator properly.os.* (except os.clock, os.difftime, and os.time). os.execute can significantly alter the host system.io.*: Provides access to the hard drive.dofile|require: Provides access to the hard drive.The exclusion of require means that plugins must only use PDK functions kong.*. The ngx.* abstraction is
also available, but it is not guaranteed to be present in future versions of the plugin.
In addition to the above restrictions:
string or table) are read-only and can’t be modified.kong.cache points to a cache instance that is dedicated to the Serverless Functions plugins. It does not provide access to the global Kong Gateway cache. It only exposes the get method. Explicit write operations like set or invalidate are not available.Further, as code runs in the context of the log phase, only PDK methods that can run in said phase can be used.
The Kafka Log plugin supports integration with Confluent Schema Registry for AVRO and JSON schemas.
Schema registries provide a centralized repository for managing and validating schemas for data formats like AVRO and JSON. Integrating with a schema registry allows the plugin to validate and serialize/deserialize messages in a standardized format.
Using a schema registry with Kong Gateway provides several benefits:
To learn more about Kong’s supported schema registry, see:
When a producer plugin is configured with a schema registry, the following workflow occurs:
sequenceDiagram
autonumber
participant Client
participant Kong as Kafka Log plugin
participant Registry as Schema Registry
participant Kafka
activate Client
activate Kong
Client->>Kong: Send request
deactivate Client
activate Registry
Kong->>Registry: Fetch schema from registry
Registry-->>Kong: Return schema
deactivate Registry
Kong->>Kong: Validate message against schema
Kong->>Kong: Serialize using schema
activate Kafka
Kong->>Kafka: Forward to Kafka
deactivate Kong
deactivate Kafka
If validation fails, the request is rejected with an error message.
To configure Schema Registry with the Kafka Log plugin, use the config.schema_registry parameter in your plugin configuration.
For sample configuration values, see: