From 727d7aa32fcefe6859413b59e14619ac076bab98 Mon Sep 17 00:00:00 2001 From: Vaibhav-C-S Date: Sat, 20 Jun 2026 07:58:01 +0530 Subject: [PATCH 1/5] fluentbit(kafka): add raw format and rawLogKey support Signed-off-by: Vaibhav-C-S --- .../v1alpha2/plugins/output/kafka_types.go | 7 ++++++- .../fluentbit.fluent.io_clusteroutputs.yaml | 7 ++++++- .../templates/fluentbit.fluent.io_outputs.yaml | 7 ++++++- .../crds/fluentbit.fluent.io_clusteroutputs.yaml | 7 ++++++- .../crds/fluentbit.fluent.io_outputs.yaml | 7 ++++++- .../bases/fluentbit.fluent.io_clusteroutputs.yaml | 7 ++++++- config/crd/bases/fluentbit.fluent.io_outputs.yaml | 7 ++++++- docs/plugins/fluentbit/output/kafka.md | 3 ++- manifests/setup/setup.yaml | 14 ++++++++++++-- 9 files changed, 56 insertions(+), 10 deletions(-) diff --git a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go index 61e0bd2bc..f37455965 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go @@ -12,8 +12,10 @@ import ( // Kafka output plugin allows to ingest your records into an Apache Kafka service.
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka** type Kafka struct { - // Specify data format, options available: json, msgpack. + // Specify data format, options available: json, msgpack, raw. Format string `json:"format,omitempty"` + // When using the raw format, the value of raw_log_key in the record is sent to Kafka as the payload. + RawLogKey string `json:"rawLogKey,omitempty"` // Optional key to store the message MessageKey string `json:"messageKey,omitempty"` // If set, the value of Message_Key_Field in the record will indicate the message key. @@ -60,6 +62,9 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) { if k.Format != "" { kvs.Insert("Format", k.Format) } + if k.RawLogKey != "" { + kvs.Insert("Raw_Log_Key", k.RawLogKey) + } if k.MessageKey != "" { kvs.Insert("Message_Key", k.MessageKey) } diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml index 214a9c96b..f43b1470e 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml @@ -2258,7 +2258,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2277,6 +2278,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml index 76088d713..a591a8b1f 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml @@ -2258,7 +2258,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2277,6 +2278,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml index 2abf993df..5e1089f8e 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -2256,7 +2256,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2275,6 +2276,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml index dcee8f101..c80fd7fc9 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml @@ -2256,7 +2256,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2275,6 +2276,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index 8841ee8f2..f30339007 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -2257,7 +2257,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2276,6 +2277,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index 34c4f25fa..a8b6552a9 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -2257,7 +2257,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -2276,6 +2277,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/docs/plugins/fluentbit/output/kafka.md b/docs/plugins/fluentbit/output/kafka.md index 65ac6917a..367dedfe8 100644 --- a/docs/plugins/fluentbit/output/kafka.md +++ b/docs/plugins/fluentbit/output/kafka.md @@ -5,7 +5,8 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service. | Field | Description | Scheme | | ----- | ----------- | ------ | -| format | Specify data format, options available: json, msgpack. | string | +| format | Specify data format, options available: json, msgpack, raw. | string | +| rawLogKey | When using the raw format, the value of raw_log_key in the record is sent to Kafka as the payload. | string | | messageKey | Optional key to store the message | string | | messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string | | timestampKey | Set the key to store the record timestamp | string | diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index fa044f4a2..60e0e0dbe 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -6457,7 +6457,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -6476,6 +6477,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string @@ -37123,7 +37128,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' type: string messageKey: description: Optional key to store the message @@ -37142,6 +37148,10 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: When using the raw format, the value of raw_log_key + in the record is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string From 2937c43652ff8ef9809b8f82f19df1b009c0fb5d Mon Sep 17 00:00:00 2001 From: Vaibhav-C-S Date: Sat, 20 Jun 2026 08:38:54 +0530 Subject: [PATCH 2/5] fluentbit(kafka): add raw format and rawLogKey support Signed-off-by: Vaibhav-C-S --- docs/plugins/fluentbit/output/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/plugins/fluentbit/output/kafka.md b/docs/plugins/fluentbit/output/kafka.md index 367dedfe8..f905ca980 100644 --- a/docs/plugins/fluentbit/output/kafka.md +++ b/docs/plugins/fluentbit/output/kafka.md @@ -6,7 +6,7 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service. | Field | Description | Scheme | | ----- | ----------- | ------ | | format | Specify data format, options available: json, msgpack, raw. | string | -| rawLogKey | When using the raw format, the value of raw_log_key in the record is sent to Kafka as the payload. | string | +| rawLogKey | When using the `raw` format, the value of `raw_log_key` in the record is sent to Kafka as the payload. | string | | messageKey | Optional key to store the message | string | | messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string | | timestampKey | Set the key to store the record timestamp | string | From 279964f5be59d3baad30a6aac7cbe816c3a5a739 Mon Sep 17 00:00:00 2001 From: Vaibhav Cheruvu <123824888+Vaibhav-C-S@users.noreply.github.com> Date: Mon, 22 Jun 2026 23:15:41 +0530 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Vaibhav Cheruvu <123824888+Vaibhav-C-S@users.noreply.github.com> --- apis/fluentbit/v1alpha2/plugins/output/kafka_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go index f37455965..beb0d2089 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go @@ -14,7 +14,7 @@ import ( type Kafka struct { // Specify data format, options available: json, msgpack, raw. Format string `json:"format,omitempty"` - // When using the raw format, the value of raw_log_key in the record is sent to Kafka as the payload. + // When using the raw format, the value of the record field specified by RawLogKey is sent to Kafka as the payload. RawLogKey string `json:"rawLogKey,omitempty"` // Optional key to store the message MessageKey string `json:"messageKey,omitempty"` From 9ffafbac1ef886a9dad58308e5ea3b36b71fc722 Mon Sep 17 00:00:00 2001 From: Vaibhav-C-S Date: Mon, 22 Jun 2026 23:24:17 +0530 Subject: [PATCH 4/5] fluentbit(kafka): add raw format support Signed-off-by: Vaibhav-C-S --- .../templates/fluentbit.fluent.io_clusteroutputs.yaml | 4 ++-- .../templates/fluentbit.fluent.io_outputs.yaml | 4 ++-- .../crds/fluentbit.fluent.io_clusteroutputs.yaml | 4 ++-- .../fluent-operator/crds/fluentbit.fluent.io_outputs.yaml | 4 ++-- config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml | 4 ++-- config/crd/bases/fluentbit.fluent.io_outputs.yaml | 4 ++-- docs/plugins/fluentbit/output/kafka.md | 2 +- manifests/setup/setup.yaml | 8 ++++---- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml index f43b1470e..715268190 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml @@ -2279,8 +2279,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml index a591a8b1f..f80db4271 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml @@ -2279,8 +2279,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml index 5e1089f8e..5f69755fb 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -2277,8 +2277,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml index c80fd7fc9..154ef72ee 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml @@ -2277,8 +2277,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index f30339007..027b3ac05 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -2278,8 +2278,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index a8b6552a9..c2fe12bc4 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -2278,8 +2278,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: diff --git a/docs/plugins/fluentbit/output/kafka.md b/docs/plugins/fluentbit/output/kafka.md index f905ca980..1f25d3fab 100644 --- a/docs/plugins/fluentbit/output/kafka.md +++ b/docs/plugins/fluentbit/output/kafka.md @@ -6,7 +6,7 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service. | Field | Description | Scheme | | ----- | ----------- | ------ | | format | Specify data format, options available: json, msgpack, raw. | string | -| rawLogKey | When using the `raw` format, the value of `raw_log_key` in the record is sent to Kafka as the payload. | string | +| rawLogKey | When using the `raw` format, the value of the record field specified by `RawLogKey` is sent to Kafka as the payload. | string | | messageKey | Optional key to store the message | string | | messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string | | timestampKey | Set the key to store the record timestamp | string | diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index 60e0e0dbe..43bf9645d 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -6478,8 +6478,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: @@ -37149,8 +37149,8 @@ spec: format: int64 type: integer rawLogKey: - description: When using the raw format, the value of raw_log_key - in the record is sent to Kafka as the payload. + description: When using the raw format, the value of the record + field specified by RawLogKey is sent to Kafka as the payload. type: string rdkafka: additionalProperties: From f12844433b6817d71bc9610a8b5d71f94d7eb776 Mon Sep 17 00:00:00 2001 From: Vaibhav-C-S Date: Mon, 22 Jun 2026 23:32:51 +0530 Subject: [PATCH 5/5] fluentbit(kafka): add raw format support Signed-off-by: Vaibhav-C-S --- docs/plugins/fluentbit/output/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/plugins/fluentbit/output/kafka.md b/docs/plugins/fluentbit/output/kafka.md index 1f25d3fab..c356ccaff 100644 --- a/docs/plugins/fluentbit/output/kafka.md +++ b/docs/plugins/fluentbit/output/kafka.md @@ -6,7 +6,7 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service. | Field | Description | Scheme | | ----- | ----------- | ------ | | format | Specify data format, options available: json, msgpack, raw. | string | -| rawLogKey | When using the `raw` format, the value of the record field specified by `RawLogKey` is sent to Kafka as the payload. | string | +| rawLogKey | When using the raw format, the value of the record field specified by RawLogKey is sent to Kafka as the payload. | string | | messageKey | Optional key to store the message | string | | messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string | | timestampKey | Set the key to store the record timestamp | string |