diff --git a/CMakeLists.txt b/CMakeLists.txt index 05fac0673d3..b8d92333926 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,6 +299,7 @@ if(FLB_ALL) set(FLB_OUT_NULL 1) set(FLB_OUT_PLOT 1) set(FLB_OUT_FILE 1) + set(FLB_OUT_LOGROTATE 1) set(FLB_OUT_RETRY 1) set(FLB_OUT_TD 1) set(FLB_OUT_STDOUT 1) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 703073ff3f8..8f5ec837012 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -117,6 +117,7 @@ DEFINE_OPTION(FLB_OUT_DATADOG "Enable DataDog output plugin" DEFINE_OPTION(FLB_OUT_ES "Enable Elasticsearch output plugin" ON) DEFINE_OPTION(FLB_OUT_EXIT "Enable Exit output plugin" ON) DEFINE_OPTION(FLB_OUT_FILE "Enable file output plugin" ON) +DEFINE_OPTION(FLB_OUT_LOGROTATE "Enable logrotate output plugin" ON) DEFINE_OPTION(FLB_OUT_FLOWCOUNTER "Enable flowcount output plugin" ON) DEFINE_OPTION(FLB_OUT_FORWARD "Enable Forward output plugin" ON) DEFINE_OPTION(FLB_OUT_GELF "Enable GELF output plugin" ON) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 2791fffb4d9..48a13aa5c71 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -100,6 +100,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_OUT_NATS No) set(FLB_OUT_PLOT No) set(FLB_OUT_FILE Yes) + set(FLB_OUT_LOGROTATE Yes) set(FLB_OUT_TD No) set(FLB_OUT_RETRY No) set(FLB_OUT_SPLUNK Yes) diff --git a/conf/fluent-bit-logrotate.conf b/conf/fluent-bit-logrotate.conf new file mode 100644 index 00000000000..e1b18482e63 --- /dev/null +++ b/conf/fluent-bit-logrotate.conf @@ -0,0 +1,21 @@ +[SERVICE] + Flush 1 + Log_Level info + Parsers_File parsers.conf + +[INPUT] + Name dummy + Tag test.logrotate + Dummy {"message": "test log message", "level": "info"} + Rate 10 + +[OUTPUT] + Name logrotate + Match test.logrotate + Path /tmp/logs + File test.log + Format json + Max_Size 10M + Max_Files 5 + Gzip On + Mkdir On diff --git a/include/fluent-bit/flb_aws_credentials.h b/include/fluent-bit/flb_aws_credentials.h index 36652ead45a..cacaa492ad4 100644 --- a/include/fluent-bit/flb_aws_credentials.h +++ b/include/fluent-bit/flb_aws_credentials.h @@ -34,6 +34,14 @@ /* 5 second timeout for credential related http requests */ #define FLB_AWS_CREDENTIAL_NET_TIMEOUT 5 +/* IoT Credentials Environment Variables */ +#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" +#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" +#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" +#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" +#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" +#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" + /* * A structure that wraps the sensitive data needed to sign an AWS request */ @@ -225,6 +233,11 @@ struct flb_aws_provider *flb_eks_provider_create(struct flb_config *config, flb_aws_client_generator *generator); +/* + * IoT Provider + */ +struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, + struct flb_aws_client_generator *generator); /* * STS Assume Role Provider. diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 65d417d7cbb..fd7d12af5a0 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -363,6 +363,7 @@ REGISTER_OUT_PLUGIN("out_datadog") REGISTER_OUT_PLUGIN("out_es") REGISTER_OUT_PLUGIN("out_exit") REGISTER_OUT_PLUGIN("out_file") +REGISTER_OUT_PLUGIN("out_logrotate") REGISTER_OUT_PLUGIN("out_forward") REGISTER_OUT_PLUGIN("out_http") REGISTER_OUT_PLUGIN("out_influxdb") diff --git a/plugins/out_logrotate/CMakeLists.txt b/plugins/out_logrotate/CMakeLists.txt new file mode 100644 index 00000000000..870d922320b --- /dev/null +++ b/plugins/out_logrotate/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + logrotate.c) + +if(MSVC) + FLB_PLUGIN(out_logrotate "${src}" "Shlwapi") +else() + FLB_PLUGIN(out_logrotate "${src}" "") +endif() diff --git a/plugins/out_logrotate/logrotate.c b/plugins/out_logrotate/logrotate.c new file mode 100644 index 00000000000..4d6dfa404d1 --- /dev/null +++ b/plugins/out_logrotate/logrotate.c @@ -0,0 +1,1613 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include /* dirname */ +#endif +#include /* PRIu64 */ +#include /* PATH_MAX */ +#include +#include +#include + +#ifdef FLB_SYSTEM_WINDOWS +#include +#include +#include +#endif + +#include "logrotate.h" + +#ifdef FLB_SYSTEM_WINDOWS +#define NEWLINE "\r\n" +#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) +#else +#define NEWLINE "\n" +#endif + +#ifdef FLB_SYSTEM_WINDOWS +#define FLB_PATH_SEPARATOR "\\" +#else +#define FLB_PATH_SEPARATOR "/" +#endif + +/* Constants for streaming gzip compression */ +#define GZIP_CHUNK_SIZE (64 * 1024) /* 64KB chunks for memory efficiency */ +#define GZIP_HEADER_SIZE 10 +#define GZIP_FOOTER_SIZE 8 + +struct logrotate_file_size { + flb_sds_t filename; + size_t size; + flb_lock_t lock; /* Mutex to protect file operations */ + struct mk_list _head; +}; + +struct flb_logrotate_conf { + const char *out_path; + const char *out_file; + const char *delimiter; + const char *label_delimiter; + const char *template; + int format; + int csv_column_names; + int mkdir; + size_t max_size; /* Max file size */ + int max_files; /* Maximum number of rotated files to keep */ + int gzip; /* Whether to gzip rotated files */ + struct mk_list file_sizes; /* Linked list to store file size per filename */ + struct flb_output_instance *ins; +}; + +static char *check_delimiter(const char *str) { + if (str == NULL) { + return NULL; + } + + if (!strcasecmp(str, "\\t") || !strcasecmp(str, "tab")) { + return "\t"; + } else if (!strcasecmp(str, "space")) { + return " "; + } else if (!strcasecmp(str, "comma")) { + return ","; + } + + return NULL; +} + +static int cb_logrotate_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) { + int ret; + const char *tmp; + char *ret_str; + (void)config; + (void)data; + struct flb_logrotate_conf *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_logrotate_conf)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; /* default */ + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + ctx->template = NULL; + + /* Initialize linked list to store file sizes per filename */ + mk_list_init(&ctx->file_sizes); + + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + if (ctx->max_files <= 0) { + flb_plg_error(ctx->ins, "invalid max_files=%d; must be >= 1", + ctx->max_files); + flb_free(ctx); + return -1; + } + + /* Optional, file format */ + tmp = flb_output_get_property("Format", ins); + if (tmp) { + if (!strcasecmp(tmp, "csv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_CSV; + ctx->delimiter = ","; + } else if (!strcasecmp(tmp, "ltsv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_LTSV; + ctx->delimiter = "\t"; + ctx->label_delimiter = ":"; + } else if (!strcasecmp(tmp, "plain")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_PLAIN; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } else if (!strcasecmp(tmp, "msgpack")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_MSGPACK; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } else if (!strcasecmp(tmp, "template")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_TEMPLATE; + } else if (!strcasecmp(tmp, "out_logrotate")) { + /* for explicit setting */ + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; + } else { + flb_plg_error(ctx->ins, "unknown format %s. abort.", tmp); + flb_free(ctx); + return -1; + } + } + + tmp = flb_output_get_property("delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->delimiter = ret_str; + } + + tmp = flb_output_get_property("label_delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->label_delimiter = ret_str; + } + + /* Set the context */ + flb_output_set_context(ins, ctx); + + /* Log resolved configuration values */ + flb_plg_info(ctx->ins, + "logrotate plugin initialized with: max_size=%zu, max_files=%d, " + "gzip=%s, path=%s", + ctx->max_size, ctx->max_files, + ctx->gzip == FLB_TRUE ? "true" : "false", + ctx->out_path ? ctx->out_path : "not set"); + + if (ctx->max_files <= 0) { + flb_plg_error(ctx->ins, "invalid max_files=%d; must be >= 1", + ctx->max_files); + flb_free(ctx); + return -1; + } + + return 0; +} + +static int csv_output(FILE *fp, int column_names, struct flb_time *tm, + msgpack_object *obj, struct flb_logrotate_conf *ctx) { + int i; + int map_size; + msgpack_object_kv *kv = NULL; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + + if (column_names == FLB_TRUE) { + fprintf(fp, "timestamp%s", ctx->delimiter); + for (i = 0; i < map_size; i++) { + msgpack_object_print(fp, (kv + i)->key); + if (i + 1 < map_size) { + fprintf(fp, "%s", ctx->delimiter); + } + } + fprintf(fp, NEWLINE); + } + + fprintf(fp, "%lld.%.09ld%s", (long long)tm->tm.tv_sec, tm->tm.tv_nsec, + ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv + i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv + (map_size - 1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int ltsv_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) { + msgpack_object_kv *kv = NULL; + int i; + int map_size; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + fprintf(fp, "\"time\"%s%f%s", ctx->label_delimiter, flb_time_to_double(tm), + ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv + i)->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv + i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv + (map_size - 1))->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv + (map_size - 1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int template_output_write(struct flb_logrotate_conf *ctx, FILE *fp, + struct flb_time *tm, msgpack_object *obj, + const char *key, int size) { + int i; + msgpack_object_kv *kv; + + /* + * Right now we treat "{time}" specially and fill the placeholder + * with the metadata timestamp (formatted as float). + */ + if (!strncmp(key, "time", size)) { + fprintf(fp, "%f", flb_time_to_double(tm)); + return 0; + } + + if (obj->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "invalid object type (type=%i)", obj->type); + return -1; + } + + for (i = 0; i < obj->via.map.size; i++) { + kv = obj->via.map.ptr + i; + + if (size != kv->key.via.str.size) { + continue; + } + + if (!memcmp(key, kv->key.via.str.ptr, size)) { + if (kv->val.type == MSGPACK_OBJECT_STR) { + fwrite(kv->val.via.str.ptr, 1, kv->val.via.str.size, fp); + } else { + msgpack_object_print(fp, kv->val); + } + return 0; + } + } + return -1; +} + +/* + * Python-like string templating for out_logrotate. + * + * This accepts a format string like "my name is {name}" and fills + * placeholders using corresponding values in a record. + * + * e.g. {"name":"Tom"} => "my name is Tom" + */ +static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) { + int i; + int len = strlen(ctx->template); + int keysize; + const char *key; + const char *pos; + const char *inbrace = NULL; /* points to the last open brace */ + + for (i = 0; i < len; i++) { + pos = ctx->template + i; + if (*pos == '{') { + if (inbrace) { + /* + * This means that we find another open brace inside + * braces (e.g. "{a{b}"). Ignore the previous one. + */ + fwrite(inbrace, 1, pos - inbrace, fp); + } + inbrace = pos; + } else if (*pos == '}' && inbrace) { + key = inbrace + 1; + keysize = pos - inbrace - 1; + + if (template_output_write(ctx, fp, tm, obj, key, keysize)) { + fwrite(inbrace, 1, pos - inbrace + 1, fp); + } + inbrace = NULL; + } else { + if (!inbrace) { + fputc(*pos, fp); + } + } + } + + /* Handle an unclosed brace like "{abc" */ + if (inbrace) { + fputs(inbrace, fp); + } + fputs(NEWLINE, fp); + return 0; +} + +static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, + int escape_unicode) { + char *buf; + + buf = flb_msgpack_to_json_str(alloc_size, obj, escape_unicode); + if (buf) { + fprintf(fp, "%s" NEWLINE, buf); + flb_free(buf); + } + return 0; +} + +static void print_metrics_text(struct flb_output_instance *ins, FILE *fp, + const void *data, size_t bytes) { + int ret; + size_t off = 0; + cfl_sds_t text; + struct cmt *cmt = NULL; + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *)data, bytes, &off); + if (ret != 0) { + flb_plg_error(ins, "could not process metrics payload"); + return; + } + + /* convert to text representation */ + text = cmt_encode_text_create(cmt); + + /* destroy cmt context */ + cmt_destroy(cmt); + + fprintf(fp, "%s", text); + cmt_encode_text_destroy(text); +} + +static int mkpath(struct flb_output_instance *ins, const char *dir) { + struct stat st; + char *dup_dir = NULL; +#ifdef FLB_SYSTEM_MACOS + char *parent_dir = NULL; +#endif +#ifdef FLB_SYSTEM_WINDOWS + char parent_path[MAX_PATH]; + DWORD err; + char *p; + char *sep; +#endif + int ret; + + if (!dir) { + errno = EINVAL; + return -1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return -1; + } + + if (stat(dir, &st) == 0) { + if (S_ISDIR(st.st_mode)) { + return 0; + } + flb_plg_error(ins, "%s is not a directory", dir); + errno = ENOTDIR; + return -1; + } + +#ifdef FLB_SYSTEM_WINDOWS + if (strncpy_s(parent_path, MAX_PATH, dir, _TRUNCATE) != 0) { + flb_plg_error(ins, "path is too long: %s", dir); + errno = ENAMETOOLONG; + return -1; + } + + p = parent_path; + + /* Skip the drive letter if present (e.g., "C:") */ + if (p[1] == ':') { + p += 2; + } + + /* Normalize all forward slashes to backslashes */ + while (*p != '\0') { + if (*p == '/') { + *p = '\\'; + } + p++; + } + + flb_plg_debug(ins, "processing path '%s'", parent_path); + sep = strstr(parent_path, FLB_PATH_SEPARATOR); + if (sep != NULL && PathRemoveFileSpecA(parent_path)) { + flb_plg_debug(ins, "creating directory (recursive) %s", parent_path); + ret = mkpath(ins, parent_path); + if (ret != 0) { + /* If creating the parent failed, we cannot continue. */ + return -1; + } + } + + flb_plg_debug(ins, "attempting to create final directory '%s'", dir); + if (!CreateDirectoryA(dir, NULL)) { + err = GetLastError(); + + if (err != ERROR_ALREADY_EXISTS) { + flb_plg_error(ins, "could not create directory '%s' (error=%lu)", dir, + err); + return -1; + } + } + + return 0; +#elif FLB_SYSTEM_MACOS + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR(st.st_mode)) { + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + flb_free(dup_dir); + return ret; + } + } + + ret = mkpath(ins, dirname(dup_dir)); + if (ret != 0) { + flb_free(dup_dir); + return ret; + } + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + if (ret == -1 && errno == EEXIST) { + if (stat(dup_dir, &st) == 0 && S_ISDIR(st.st_mode)) { + ret = 0; + } + } + flb_free(dup_dir); + return ret; +#else + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + ret = mkpath(ins, dirname(dup_dir)); + flb_free(dup_dir); + if (ret != 0) { + return ret; + } + flb_plg_debug(ins, "creating directory %s", dir); + ret = mkdir(dir, 0755); + if (ret == -1 && errno == EEXIST) { + if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode)) { + ret = 0; + } + } + return ret; +#endif +} + +/* Helper function to find a file size entry by filename */ +static struct logrotate_file_size * +find_file_size_entry(struct flb_logrotate_conf *ctx, const char *filename) { + struct mk_list *head; + struct logrotate_file_size *entry; + + mk_list_foreach(head, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + if (entry->filename && strcmp(entry->filename, filename) == 0) { + return entry; + } + } + return NULL; +} + +/* Helper function to update or create file size entry */ +static int update_file_size(struct flb_logrotate_conf *ctx, + const char *filename, size_t size) { + struct logrotate_file_size *entry; + flb_sds_t filename_copy; + + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + /* Update existing entry */ + entry->size = size; + return 0; + } + + /* Create new entry */ + entry = flb_calloc(1, sizeof(struct logrotate_file_size)); + if (!entry) { + flb_errno(); + return -1; + } + + filename_copy = flb_sds_create(filename); + if (!filename_copy) { + flb_free(entry); + flb_errno(); + return -1; + } + + entry->filename = filename_copy; + entry->size = size; + + /* Initialize mutex for this file entry */ + if (flb_lock_init(&entry->lock) != 0) { + flb_plg_error(ctx->ins, "failed to initialize mutex for file %s", filename); + flb_sds_destroy(filename_copy); + flb_free(entry); + return -1; + } + + mk_list_add(&entry->_head, &ctx->file_sizes); + + return 0; +} + +/* Helper function to remove file size entry */ +static void remove_file_size(struct flb_logrotate_conf *ctx, + const char *filename) { + struct logrotate_file_size *entry; + + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } +} + +/* Function to update file size counter for a specific file */ +static void update_file_size_counter(struct flb_logrotate_conf *ctx, + const char *filename, FILE *fp) { + struct stat st; + size_t file_size; + struct logrotate_file_size *entry; + int ret; + + if (fstat(fileno(fp), &st) != 0 || st.st_size < 0) { + return; + } + + file_size = (size_t)st.st_size; + + /* Find or create file size entry */ + entry = find_file_size_entry(ctx, filename); + if (entry == NULL) { + /* Entry doesn't exist, create it */ + ret = update_file_size(ctx, filename, file_size); + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to create file size entry for %s", + filename); + return; + } + /* Find the entry we just created */ + entry = find_file_size_entry(ctx, filename); + if (entry == NULL) { + flb_plg_warn(ctx->ins, + "failed to find file size entry for %s after creation", + filename); + return; + } + } + + /* Acquire lock before updating size */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_warn(ctx->ins, "failed to acquire lock for file %s", filename); + return; + } + + /* Update size atomically */ + entry->size = file_size; + + /* Release lock */ + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); +} + +/* Function to generate timestamp for rotated file */ +static void generate_timestamp(char *timestamp, size_t size) { + time_t now = time(NULL); + struct tm tm_info; +#ifdef FLB_SYSTEM_WINDOWS + localtime_s(&tm_info, &now); +#else + localtime_r(&now, &tm_info); +#endif + strftime(timestamp, size, "%Y%m%d_%H%M%S", &tm_info); +} + +/* Helper function to write gzip header (based on flb_gzip.c) */ +static void write_gzip_header(FILE *fp) { + uint8_t header[GZIP_HEADER_SIZE] = { + 0x1F, 0x8B, /* Magic bytes */ + 0x08, /* Compression method (deflate) */ + 0x00, /* Flags */ + 0x00, 0x00, 0x00, 0x00, /* Timestamp */ + 0x00, /* Compression flags */ + 0xFF /* OS (unknown) */ + }; + fwrite(header, 1, GZIP_HEADER_SIZE, fp); +} + +/* Helper function to write gzip footer */ +static void write_gzip_footer(FILE *fp, mz_ulong crc, size_t original_size) { + uint8_t footer[GZIP_FOOTER_SIZE]; + + /* Write CRC32 */ + footer[0] = crc & 0xFF; + footer[1] = (crc >> 8) & 0xFF; + footer[2] = (crc >> 16) & 0xFF; + footer[3] = (crc >> 24) & 0xFF; + + /* Write original size */ + footer[4] = original_size & 0xFF; + footer[5] = (original_size >> 8) & 0xFF; + footer[6] = (original_size >> 16) & 0xFF; + footer[7] = (original_size >> 24) & 0xFF; + + fwrite(footer, 1, GZIP_FOOTER_SIZE, fp); +} + +/* Function to compress a file using streaming gzip (memory-safe for large + * files) */ +static int gzip_compress_file(const char *input_filename, + const char *output_filename, + struct flb_output_instance *ins) { + FILE *src_fp = NULL, *dst_fp = NULL; + char *input_buffer = NULL, *output_buffer = NULL; + size_t bytes_read, output_buffer_size; + size_t total_input_size = 0; + mz_ulong crc = MZ_CRC32_INIT; + z_stream strm; + int ret = 0, flush, status; + int deflate_initialized = 0; + + /* Open source file */ + src_fp = fopen(input_filename, "rb"); + if (!src_fp) { + flb_plg_error(ins, "failed to open source file for gzip: %s", + input_filename); + return -1; + } + + /* Open destination file */ + dst_fp = fopen(output_filename, "wb"); + if (!dst_fp) { + flb_plg_error(ins, "failed to create gzip file: %s", output_filename); + fclose(src_fp); + return -1; + } + + /* Allocate input and output buffers */ + input_buffer = flb_malloc(GZIP_CHUNK_SIZE); + output_buffer_size = compressBound(GZIP_CHUNK_SIZE); + output_buffer = flb_malloc(output_buffer_size); + + if (!input_buffer || !output_buffer) { + flb_plg_error(ins, "failed to allocate compression buffers"); + ret = -1; + goto cleanup; + } + + /* Write gzip header */ + write_gzip_header(dst_fp); + + /* Initialize deflate stream (raw deflate without gzip wrapper) */ + memset(&strm, 0, sizeof(strm)); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + status = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -Z_DEFAULT_WINDOW_BITS, 9, Z_DEFAULT_STRATEGY); + if (status != Z_OK) { + flb_plg_error(ins, "failed to initialize deflate stream"); + ret = -1; + goto cleanup; + } + deflate_initialized = 1; + + /* Process file in chunks */ + do { + bytes_read = fread(input_buffer, 1, GZIP_CHUNK_SIZE, src_fp); + if (bytes_read > 0) { + /* Update CRC and total size */ + crc = mz_crc32(crc, (const unsigned char *)input_buffer, bytes_read); + total_input_size += bytes_read; + + /* Set up deflate input */ + strm.next_in = (Bytef *)input_buffer; + strm.avail_in = bytes_read; + + /* Determine flush mode based on EOF after this read */ + flush = feof(src_fp) ? Z_FINISH : Z_NO_FLUSH; + + /* Compress chunk */ + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, flush); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, "deflate stream error during compression"); + ret = -1; + goto deflate_cleanup; + } + + /* Write compressed data */ + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (strm.avail_out == 0); + + /* Verify all input was consumed */ + if (strm.avail_in != 0) { + flb_plg_error(ins, "deflate did not consume all input data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (bytes_read > 0 && status != Z_STREAM_END); + + /* + * If the file size is a multiple of GZIP_CHUNK_SIZE, the loop above finishes + * because bytes_read == 0, but Z_FINISH was never called (flush was + * Z_NO_FLUSH). We must ensure the stream is finished. + */ + if (status != Z_STREAM_END) { + strm.next_in = Z_NULL; + strm.avail_in = 0; + + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, Z_FINISH); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, "deflate stream error during final flush"); + ret = -1; + goto deflate_cleanup; + } + + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data (final flush)"); + ret = -1; + goto deflate_cleanup; + } + } + } while (status != Z_STREAM_END); + } + + /* Verify compression completed successfully */ + if (status != Z_STREAM_END) { + flb_plg_error(ins, "compression did not complete properly"); + ret = -1; + } else { + /* Write gzip footer (CRC32 + original size) */ + write_gzip_footer(dst_fp, crc, total_input_size); + } + +deflate_cleanup: + if (deflate_initialized) { + deflateEnd(&strm); + deflate_initialized = 0; + } + +cleanup: + if (input_buffer) { + flb_free(input_buffer); + input_buffer = NULL; + } + if (output_buffer) { + flb_free(output_buffer); + output_buffer = NULL; + } + if (src_fp) { + fclose(src_fp); + src_fp = NULL; + } + if (dst_fp) { + fclose(dst_fp); + dst_fp = NULL; + } + + return ret; +} + +/* Function to rotate file */ +static int rotate_file(struct flb_logrotate_conf *ctx, const char *filename) { + char timestamp[32]; + char rotated_filename[PATH_MAX]; + char gzip_filename[PATH_MAX]; + size_t file_size = 0; + struct logrotate_file_size *entry; + int ret = 0; + int lock_acquired = 0; + + /* Get file size entry and acquire lock */ + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + file_size = entry->size; + /* Acquire lock before rotation operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", filename); + return -1; + } + lock_acquired = 1; + + /* Check if rotation is still needed (race condition check) */ + if (file_size < ctx->max_size) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + return 0; + } + } + + /* Log rotation event */ + flb_plg_info(ctx->ins, "rotating file: %s (current size: %zu bytes)", + filename, file_size); + + /* Generate timestamp */ + generate_timestamp(timestamp, sizeof(timestamp)); + + /* Create rotated filename with timestamp */ + snprintf(rotated_filename, PATH_MAX - 1, "%s.%s", filename, timestamp); + + /* Rename current file to rotated filename */ + if (rename(filename, rotated_filename) != 0) { + flb_plg_error(ctx->ins, "failed to rename file from %s to %s", filename, + rotated_filename); + ret = -1; + goto cleanup; + } + + /* If gzip is enabled, compress the rotated file */ + if (ctx->gzip == FLB_TRUE) { + snprintf(gzip_filename, PATH_MAX - 1, "%s.gz", rotated_filename); + flb_plg_debug(ctx->ins, "compressing file: %s to %s", rotated_filename, + gzip_filename); + ret = gzip_compress_file(rotated_filename, gzip_filename, ctx->ins); + if (ret == 0) { + /* Remove the uncompressed file */ + unlink(rotated_filename); + flb_plg_debug(ctx->ins, "rotated and compressed file: %s", gzip_filename); + } else { + /* Remove the failed gzip file */ + unlink(gzip_filename); + ret = -1; + goto cleanup; + } + } else { + flb_plg_debug(ctx->ins, "rotated file: %s (no compression)", + rotated_filename); + } + +cleanup: + /* Release lock if we acquired it */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + return ret; +} + +/* + * Function to validate if a filename matches the rotation pattern format + * Valid formats: + * - base_filename.YYYYMMDD_HHMMSS (15 chars after pattern) + * - base_filename.YYYYMMDD_HHMMSS.gz (18 chars after pattern) + */ +static int is_valid_rotation_filename(const char *filename, + const char *pattern) { + size_t pattern_len = strlen(pattern); + size_t filename_len = strlen(filename); + const char *suffix; + size_t suffix_len; + int i; + + /* Check that filename starts with pattern */ + if (strncmp(filename, pattern, pattern_len) != 0) { + return 0; + } + + /* Get the suffix after the pattern */ + suffix = filename + pattern_len; + suffix_len = filename_len - pattern_len; + + /* Must be exactly 15 or 18 characters */ + if (suffix_len != 15 && suffix_len != 18) { + return 0; + } + + /* For 18 characters, must end with .gz */ + if (suffix_len == 18) { + if (strcmp(suffix + 15, ".gz") != 0) { + return 0; + } + } + + /* Validate timestamp format: YYYYMMDD_HHMMSS + * - 8 digits (YYYYMMDD) + * - underscore at position 8 + * - 6 digits (HHMMSS) + */ + for (i = 0; i < 8; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + if (suffix[8] != '_') { + return 0; + } + for (i = 9; i < 15; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + + return 1; +} + +/* Function to clean up old rotated files */ +static int cleanup_old_files(struct flb_logrotate_conf *ctx, + const char *directory, const char *base_filename) { + char pattern[PATH_MAX]; + char full_path[PATH_MAX]; + char search_path[PATH_MAX]; + char **files = NULL; + int file_count = 0; + int max_files = ctx->max_files; + int i, j; + + /* Create pattern to match rotated files */ + snprintf(pattern, PATH_MAX - 1, "%s.", base_filename); + +#ifdef FLB_SYSTEM_WINDOWS + HANDLE hFind; + WIN32_FIND_DATA findData; + + /* Create search path: directory\* */ + snprintf(search_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "*", directory); + + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern)) { + file_count++; + } + } while (FindNextFileA(hFind, &findData) != 0); + + if (file_count <= max_files) { + FindClose(hFind); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + FindClose(hFind); + return -1; + } + + /* Collect file names - restart search */ + FindClose(hFind); + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + flb_free(files); + return -1; + } + + i = 0; + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern) && + i < file_count) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", directory, + findData.cFileName); + files[i] = flb_strdup(full_path); + i++; + } + } while (FindNextFileA(hFind, &findData) != 0 && i < file_count); + + FindClose(hFind); +#else + DIR *dir; + struct dirent *entry; + + dir = opendir(directory); + if (!dir) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + while ((entry = readdir(dir)) != NULL) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + file_count++; + } + } + + if (file_count <= max_files) { + closedir(dir); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + closedir(dir); + return -1; + } + + /* Collect file names */ + rewinddir(dir); + i = 0; + while ((entry = readdir(dir)) != NULL && i < file_count) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", directory, + entry->d_name); + files[i] = flb_strdup(full_path); + i++; + } + } + closedir(dir); +#endif + + /* Sort files by modification time (oldest first) */ + for (i = 0; i < file_count - 1; i++) { + for (j = i + 1; j < file_count; j++) { + struct stat st1; + struct stat st2; + if (stat(files[i], &st1) == 0 && stat(files[j], &st2) == 0) { + if (st1.st_mtime > st2.st_mtime) { + char *temp = files[i]; + files[i] = files[j]; + files[j] = temp; + } + } + } + } + + /* Remove oldest files */ + if (file_count > max_files) { + flb_plg_info( + ctx->ins, + "cleaning up old rotated files: removing %d files (keeping %d)", + file_count - max_files, max_files); + } + for (i = 0; i < file_count - max_files; i++) { + if (unlink(files[i]) == 0) { + flb_plg_debug(ctx->ins, "removed old rotated file: %s", files[i]); + } + flb_free(files[i]); + } + + /* Free remaining file names */ + for (i = file_count - max_files; i < file_count; i++) { + flb_free(files[i]); + } + + flb_free(files); + return 0; +} + +static void cb_logrotate_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, + void *out_context, struct flb_config *config) { + int ret; + int column_names; + FILE *fp; + size_t off = 0; + size_t last_off = 0; + size_t alloc_size = 0; + size_t total; + size_t file_size = 0; + char out_file[PATH_MAX]; + char *buf; + char *out_file_copy; + char directory[PATH_MAX]; + char base_filename[PATH_MAX]; + long file_pos; + bool have_directory; + struct flb_logrotate_conf *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct logrotate_file_size *entry = NULL; + struct stat st; + int lock_acquired = 0; + + (void)config; + + /* Set the right output file */ + if (ctx->out_path) { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, ctx->out_file); + } else { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, event_chunk->tag); + } + } else { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s", ctx->out_file); + } else { + snprintf(out_file, PATH_MAX - 1, "%s", event_chunk->tag); + } + } + + /* Find or create file size entry and acquire lock */ + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Entry doesn't exist yet, create it with initial size 0 */ + if (update_file_size(ctx, out_file, 0) == 0) { + entry = find_file_size_entry(ctx, out_file); + } + } + + if (entry != NULL) { + /* Acquire lock before any file operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", out_file); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + lock_acquired = 1; + } + + /* Check if file needs rotation based on current size counter */ + if (entry != NULL) { + file_size = entry->size; + } else { + /* Entry doesn't exist, check using stat */ + if (stat(out_file, &st) == 0 && st.st_size >= 0) { + file_size = (size_t)st.st_size; + } + } + + if (file_size >= ctx->max_size) { + have_directory = false; + directory[0] = '\0'; + /* Extract directory and base filename for cleanup */ + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + strncpy(directory, out_file_copy, PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#else + strncpy(directory, dirname(out_file_copy), PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#endif + flb_free(out_file_copy); + have_directory = true; + } + + /* Get base filename for cleanup */ + { + char *last_sep = strrchr(out_file, FLB_PATH_SEPARATOR[0]); + if (last_sep) { + strncpy(base_filename, last_sep + 1, PATH_MAX - 1); + } else { + strncpy(base_filename, out_file, PATH_MAX - 1); + } + base_filename[PATH_MAX - 1] = '\0'; + } + + /* Release lock before rotation (rotate_file will acquire its own) */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + lock_acquired = 0; + } + + /* Rotate the file */ + if (rotate_file(ctx, out_file) == 0) { + /* Remove file size entry from list after rotation */ + remove_file_size(ctx, out_file); + entry = NULL; /* Entry was removed */ + /* Clean up old rotated files */ + if (have_directory) { + cleanup_old_files(ctx, directory, base_filename); + } + } + + /* Re-acquire lock after rotation */ + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Create new entry after rotation */ + if (update_file_size(ctx, out_file, 0) == 0) { + entry = find_file_size_entry(ctx, out_file); + } + } + if (entry != NULL) { + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to re-acquire lock for file %s", + out_file); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + lock_acquired = 1; + } + } + + /* Open output file with default name as the Tag */ + fp = fopen(out_file, "ab+"); + if (ctx->mkdir == FLB_TRUE && fp == NULL && errno == ENOENT) { + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + ret = mkpath(ctx->ins, out_file_copy); +#else + ret = mkpath(ctx->ins, dirname(out_file_copy)); +#endif + flb_free(out_file_copy); + if (ret == 0) { + fp = fopen(out_file, "ab+"); + } + } + } + if (fp == NULL) { + flb_errno(); + flb_plg_error(ctx->ins, "error opening: %s", out_file); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Initialize file size counter if this is a new file */ + if (entry == NULL) { + /* File not in list, initialize it */ + update_file_size_counter(ctx, out_file, fp); + /* Re-find entry after initialization */ + entry = find_file_size_entry(ctx, out_file); + if (entry != NULL && !lock_acquired) { + /* Acquire lock if we didn't have it before */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) == 0) { + lock_acquired = 1; + } + } + } + + /* + * Get current file stream position, we gather this in case 'csv' format + * needs to write the column names. + */ + file_pos = ftell(fp); + + /* Check if the event type is metrics, handle the payload differently */ + if (event_chunk->type == FLB_INPUT_METRICS) { + print_metrics_text(ctx->ins, fp, event_chunk->data, event_chunk->size); + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + + /* + * Msgpack output format used to create unit tests files, useful for + * Fluent Bit developers. + */ + if (ctx->format == FLB_OUT_LOGROTATE_FMT_MSGPACK) { + off = 0; + total = 0; + + do { + ret = fwrite((char *)event_chunk->data + off, 1, event_chunk->size - off, + fp); + if (ret < 0) { + flb_errno(); + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_RETRY); + } + total += ret; + } while (total < event_chunk->size); + + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *)event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); + + /* Update file size counter before closing - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* + * Upon flush, for each array, lookup the time and the first field + * of the map to use as a data point. + */ + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == + FLB_EVENT_DECODER_SUCCESS) { + alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ + last_off = off; + + switch (ctx->format) { + case FLB_OUT_LOGROTATE_FMT_JSON: + buf = flb_msgpack_to_json_str(alloc_size, log_event.body, + config->json_escape_unicode); + if (buf) { + fprintf(fp, "%s: [%" PRIu64 ".%09lu, %s]" NEWLINE, event_chunk->tag, + (uint64_t)log_event.timestamp.tm.tv_sec, + log_event.timestamp.tm.tv_nsec, buf); + flb_free(buf); + } else { + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } else { + update_file_size_counter(ctx, out_file, fp); + } + flb_log_event_decoder_destroy(&log_decoder); + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_RETRY); + } + break; + case FLB_OUT_LOGROTATE_FMT_CSV: + if (ctx->csv_column_names == FLB_TRUE && file_pos == 0) { + column_names = FLB_TRUE; + file_pos = 1; + } else { + column_names = FLB_FALSE; + } + csv_output(fp, column_names, &log_event.timestamp, log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_LTSV: + ltsv_output(fp, &log_event.timestamp, log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_PLAIN: + plain_output(fp, log_event.body, alloc_size, config->json_escape_unicode); + break; + case FLB_OUT_LOGROTATE_FMT_TEMPLATE: + template_output(fp, &log_event.timestamp, log_event.body, ctx); + break; + } + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + FLB_OUTPUT_RETURN(FLB_OK); +} + +static int cb_logrotate_exit(void *data, struct flb_config *config) { + struct flb_logrotate_conf *ctx = data; + struct mk_list *head; + struct mk_list *tmp; + struct logrotate_file_size *entry; + + if (!ctx) { + return 0; + } + + /* Free all file size entries from linked list */ + mk_list_foreach_safe(head, tmp, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } + + flb_free(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + {FLB_CONFIG_MAP_STR, "path", NULL, 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, out_path), + "Absolute path to store the files. This parameter is optional"}, + + {FLB_CONFIG_MAP_STR, "file", NULL, 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, out_file), + "Name of the target file to write the records. If 'path' is specified, " + "the value is prefixed"}, + + {FLB_CONFIG_MAP_STR, "format", NULL, 0, FLB_FALSE, 0, + "Specify the output data format, the available options are: plain (json), " + "csv, ltsv and template. If no value is set the outgoing data is " + "formatted " + "using the tag and the record in json"}, + + {FLB_CONFIG_MAP_STR, "delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom delimiter for the records"}, + + {FLB_CONFIG_MAP_STR, "label_delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom label delimiter, to be used with 'ltsv' format"}, + + {FLB_CONFIG_MAP_STR, "template", "{time} {message}", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, template), + "Set a custom template format for the data"}, + + {FLB_CONFIG_MAP_BOOL, "csv_column_names", "false", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, csv_column_names), + "Add column names (keys) in the first line of the target file"}, + + {FLB_CONFIG_MAP_BOOL, "mkdir", "false", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, mkdir), + "Recursively create output directory if it does not exist. Permissions " + "set to 0755"}, + + {FLB_CONFIG_MAP_SIZE, "max_size", "100000000", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, max_size), + "Maximum size of file before rotation (default: 100M)"}, + + {FLB_CONFIG_MAP_INT, "max_files", "7", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, max_files), + "Maximum number of rotated files to keep (default: 7)"}, + + {FLB_CONFIG_MAP_BOOL, "gzip", "true", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, gzip), + "Whether to gzip rotated files (default: true)"}, + + /* EOF */ + {0}}; + +struct flb_output_plugin out_logrotate_plugin = { + .name = "logrotate", + .description = "Generate log file with rotation", + .cb_init = cb_logrotate_init, + .cb_flush = cb_logrotate_flush, + .cb_exit = cb_logrotate_exit, + .flags = 0, + .workers = 1, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + .config_map = config_map, +}; diff --git a/plugins/out_logrotate/logrotate.h b/plugins/out_logrotate/logrotate.h new file mode 100644 index 00000000000..82d77de732f --- /dev/null +++ b/plugins/out_logrotate/logrotate.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_LOGROTATE +#define FLB_OUT_LOGROTATE + +enum { + FLB_OUT_LOGROTATE_FMT_JSON, + FLB_OUT_LOGROTATE_FMT_CSV, + FLB_OUT_LOGROTATE_FMT_LTSV, + FLB_OUT_LOGROTATE_FMT_PLAIN, + FLB_OUT_LOGROTATE_FMT_MSGPACK, + FLB_OUT_LOGROTATE_FMT_TEMPLATE, +}; + +#endif diff --git a/src/aws/CMakeLists.txt b/src/aws/CMakeLists.txt index 941e811b633..de7d491cb47 100644 --- a/src/aws/CMakeLists.txt +++ b/src/aws/CMakeLists.txt @@ -15,6 +15,7 @@ set(src "flb_aws_imds.c" "flb_aws_credentials_http.c" "flb_aws_credentials_profile.c" + "flb_aws_credentials_iot.c" ) message(STATUS "=== AWS Credentials ===") diff --git a/src/aws/flb_aws_credentials.c b/src/aws/flb_aws_credentials.c index 75f13b111f2..94e2515279a 100644 --- a/src/aws/flb_aws_credentials.c +++ b/src/aws/flb_aws_credentials.c @@ -51,7 +51,6 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config int eks_irsa, char *profile); - /* * The standard credential provider chain: * 1. Environment variables @@ -59,6 +58,7 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config * 3. EKS OIDC * 4. EC2 IMDS * 5. ECS HTTP credentials endpoint + * 6. IoT credentials endpoint * * This provider will evaluate each provider in order, returning the result * from the first provider that returns valid credentials. @@ -566,6 +566,14 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config mk_list_add(&sub_provider->_head, &implementation->sub_providers); + /* IoT Provider - check early since it requires specific environment variables */ + sub_provider = flb_iot_provider_create(config, generator); + if (sub_provider) { + /* IoT provider can fail if we are not running in IoT */ + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized IoT Provider in standard chain"); + } + flb_debug("[aws_credentials] creating profile %s provider", profile); sub_provider = flb_profile_provider_create(profile); if (sub_provider) { diff --git a/src/aws/flb_aws_credentials_iot.c b/src/aws/flb_aws_credentials_iot.c new file mode 100644 index 00000000000..0f27f688bef --- /dev/null +++ b/src/aws/flb_aws_credentials_iot.c @@ -0,0 +1,655 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/* IoT Provider */ +struct flb_aws_provider_iot { + struct flb_aws_credentials *creds; + time_t next_refresh; + + struct flb_aws_client *client; + + /* IoT specific configuration */ + char *key_file; + char *cert_file; + char *ca_cert_file; + char *credentials_endpoint; + char *thing_name; + char *role_alias; + + /* TLS configuration for IoT certificates */ + struct flb_tls *tls; + + /* Static header for thing name */ + struct flb_aws_header thing_name_header; +}; + +/* Forward declarations */ +static int iot_credentials_request(struct flb_aws_provider_iot *implementation); +static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, size_t response_len, time_t *expiration); + +struct flb_aws_credentials *get_credentials_fn_iot(struct flb_aws_provider *provider) +{ + struct flb_aws_credentials *creds = NULL; + int refresh = FLB_FALSE; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Requesting credentials from the " + "IoT provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + if (try_lock_provider(provider)) { + flb_debug("[aws_credentials] IoT Provider: Refreshing credential " + "cache."); + iot_credentials_request(implementation); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + goto error; + } + } else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_iot(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Refresh called on the IoT provider"); + + if (try_lock_provider(provider)) { + ret = iot_credentials_request(implementation); + unlock_provider(provider); + } + return ret; +} + +int init_fn_iot(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Init called on the IoT provider"); + + implementation->client->debug_only = FLB_TRUE; + + if (try_lock_provider(provider)) { + ret = iot_credentials_request(implementation); + unlock_provider(provider); + } + + implementation->client->debug_only = FLB_FALSE; + return ret; +} + +void sync_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Sync called on the IoT provider"); + /* Remove async flag */ + flb_stream_disable_async_mode(&implementation->client->upstream->base); +} + +void async_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Async called on the IoT provider"); + /* Add async flag */ + flb_stream_enable_async_mode(&implementation->client->upstream->base); +} + +void upstream_set_fn_iot(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] upstream_set called on the IoT provider"); + /* Associate output and upstream */ + flb_output_upstream_set(implementation->client->upstream, ins); +} + +void destroy_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->client) { + flb_aws_client_destroy(implementation->client); + } + + if (implementation->tls) { + flb_tls_destroy(implementation->tls); + } + + if (implementation->key_file) { + flb_free(implementation->key_file); + } + if (implementation->cert_file) { + flb_free(implementation->cert_file); + } + if (implementation->ca_cert_file) { + flb_free(implementation->ca_cert_file); + } + if (implementation->credentials_endpoint) { + flb_free(implementation->credentials_endpoint); + } + if (implementation->thing_name) { + flb_free(implementation->thing_name); + } + if (implementation->role_alias) { + flb_free(implementation->role_alias); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable iot_provider_vtable = { + .get_credentials = get_credentials_fn_iot, + .init = init_fn_iot, + .refresh = refresh_fn_iot, + .destroy = destroy_fn_iot, + .sync = sync_fn_iot, + .async = async_fn_iot, + .upstream_set = upstream_set_fn_iot, +}; + +struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, + struct flb_aws_client_generator *generator) +{ + struct flb_aws_provider_iot *implementation = NULL; + struct flb_aws_provider *provider = NULL; + struct flb_upstream *upstream = NULL; + char *endpoint_path = NULL; + flb_sds_t protocol = NULL; + flb_sds_t host = NULL; + flb_sds_t port_sds = NULL; + int port = 443; + int ret; + + /* Check if IoT environment variables are set */ + char *key_file = getenv(AWS_IOT_KEY_FILE); + char *cert_file = getenv(AWS_IOT_CERT_FILE); + char *ca_cert_file = getenv(AWS_IOT_CA_CERT_FILE); + char *credentials_endpoint = getenv(AWS_IOT_CREDENTIALS_ENDPOINT); + char *thing_name = getenv(AWS_IOT_THING_NAME); + char *role_alias = getenv(AWS_IOT_ROLE_ALIAS); + + if (!key_file || !cert_file || !ca_cert_file || !credentials_endpoint || + !thing_name || !role_alias) { + flb_debug("[aws_credentials] Not initializing IoT provider because " + "required environment variables are not set"); + return NULL; + } + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_iot)); + if (!implementation) { + flb_free(provider); + flb_errno(); + return NULL; + } + + provider->provider_vtable = &iot_provider_vtable; + provider->implementation = implementation; + + /* Store IoT configuration */ + implementation->key_file = flb_strdup(key_file); + implementation->cert_file = flb_strdup(cert_file); + implementation->ca_cert_file = flb_strdup(ca_cert_file); + implementation->credentials_endpoint = flb_strdup(credentials_endpoint); + implementation->thing_name = flb_strdup(thing_name); + implementation->role_alias = flb_strdup(role_alias); + + /* Ensure credentials_endpoint has http or https scheme, default to https:// if missing */ + if (strncmp(credentials_endpoint, "http://", 7) != 0 && + strncmp(credentials_endpoint, "https://", 8) != 0) { + flb_sds_t tmp = flb_sds_create_size(strlen(credentials_endpoint) + 8 + 1); + if (!tmp) { + flb_error("[aws_credentials] Failed to allocate memory for credentials_endpoint"); + goto error; + } + flb_sds_cat(tmp, "https://", 8); + flb_sds_cat(tmp, credentials_endpoint, strlen(credentials_endpoint)); + flb_free(implementation->credentials_endpoint); + implementation->credentials_endpoint = tmp; + credentials_endpoint = implementation->credentials_endpoint; + } + + /* Parse the credentials endpoint URL */ + ret = flb_utils_url_split_sds(credentials_endpoint, &protocol, &host, &port_sds, &endpoint_path); + if (ret < 0) { + flb_error("[aws_credentials] Invalid IoT credentials endpoint URL: %s", credentials_endpoint); + goto error; + } + + if (port_sds != NULL) { + port = atoi(port_sds); + if (port == 0) { + flb_error("[aws_credentials] Invalid port in IoT credentials endpoint: %s", port_sds); + goto error; + } + } + + /* Create TLS configuration for IoT certificates */ + flb_debug("[aws_credentials] Creating TLS instance with cert: %s, key: %s, ca: %s", + implementation->cert_file, implementation->key_file, implementation->ca_cert_file); + + implementation->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + FLB_TRUE, /* debug - enable TLS debug */ + NULL, /* vhost */ + NULL, /* ca_path */ + implementation->ca_cert_file, + implementation->cert_file, + implementation->key_file, + NULL); /* key_passwd */ + if (!implementation->tls) { + flb_error("[aws_credentials] Failed to create TLS instance for IoT Provider"); + goto error; + } + + flb_debug("[aws_credentials] TLS instance created successfully"); + + /* Create upstream connection */ + flb_debug("[aws_credentials] Creating upstream connection to %s:%d", host, port); + upstream = flb_upstream_create(config, host, port, FLB_IO_TLS, implementation->tls); + if (!upstream) { + flb_error("[aws_credentials] IoT Provider: connection initialization error"); + goto error; + } + + flb_debug("[aws_credentials] Upstream connection created successfully"); + + upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT; + + implementation->client = generator->create(); + if (!implementation->client) { + flb_aws_provider_destroy(provider); + flb_upstream_destroy(upstream); + flb_error("[aws_credentials] IoT Provider: client creation error"); + return NULL; + } + + implementation->client->name = "iot_provider_client"; + implementation->client->has_auth = FLB_FALSE; + implementation->client->provider = NULL; + implementation->client->region = NULL; + implementation->client->service = NULL; + implementation->client->port = port; + implementation->client->flags = 0; + implementation->client->proxy = NULL; + implementation->client->upstream = upstream; + + flb_debug("[aws_credentials] IoT client configured: name=%s, port=%d, has_auth=%d", + implementation->client->name, implementation->client->port, implementation->client->has_auth); + + /* Set up the thing name header */ + implementation->thing_name_header.key = "x-amzn-iot-thingname"; + implementation->thing_name_header.key_len = 22; + implementation->thing_name_header.val = implementation->thing_name; + implementation->thing_name_header.val_len = strlen(implementation->thing_name); + + flb_debug("[aws_credentials] Setting IoT thing name header: %s = %s", + implementation->thing_name_header.key, implementation->thing_name_header.val); + + /* Set the static headers for the client */ + implementation->client->static_headers = &implementation->thing_name_header; + implementation->client->static_headers_len = 1; + + cleanup: + flb_sds_destroy(protocol); + flb_sds_destroy(host); + flb_sds_destroy(port_sds); + flb_sds_destroy(endpoint_path); + return provider; + error: + flb_aws_provider_destroy(provider); + provider = NULL; + goto cleanup; +} + +static int iot_credentials_request(struct flb_aws_provider_iot *implementation) +{ + struct flb_aws_credentials *creds = NULL; + struct flb_http_client *c = NULL; + time_t expiration; + flb_sds_t uri = NULL; + int ret; + + flb_debug("[aws_credentials] Calling IoT credentials endpoint.."); + + /* Construct the URI for the IoT credentials request */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_errno(); + return -1; + } + + uri = flb_sds_printf(&uri, "/role-aliases/%s/credentials", implementation->role_alias); + if (!uri) { + return -1; + } + + /* Make the HTTP request */ + flb_debug("[aws_credentials] Making IoT credentials request to: %s", uri); + flb_debug("[aws_credentials] Client headers count: %d", implementation->client->static_headers_len); + if (implementation->client->static_headers_len > 0) { + flb_debug("[aws_credentials] Client header: %s = %s", + implementation->client->static_headers[0].key, + implementation->client->static_headers[0].val); + } + + c = implementation->client->client_vtable->request(implementation->client, FLB_HTTP_GET, + uri, NULL, 0, NULL, 0); + + flb_sds_destroy(uri); + + if (!c) { + flb_error("[aws_credentials] IoT credentials request failed - no response"); + return -1; + } + + flb_debug("[aws_credentials] IoT credentials response status: %d", c->resp.status); + flb_debug("[aws_credentials] IoT credentials response size: %zu", c->resp.payload_size); + + if (c->resp.status != 200) { + flb_error("[aws_credentials] IoT credentials request failed with status: %d", c->resp.status); + if (c->resp.payload_size > 0) { + flb_aws_print_error_code(c->resp.payload, c->resp.payload_size, + "IoTCredentialsProvider"); + } + flb_http_client_destroy(c); + return -1; + } + + /* Debug: Log the actual response from IoT credentials endpoint */ + flb_debug("[aws_credentials] IoT credentials response (size: %zu): %.*s", + c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); + + /* Parse the credentials response - IoT endpoint may have different format */ + creds = flb_parse_iot_credentials(c->resp.payload, c->resp.payload_size, &expiration); + if (!creds) { + flb_debug("[aws_credentials] Failed to parse IoT credentials response"); + flb_http_client_destroy(c); + return -1; + } + + /* Destroy existing credentials */ + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + + implementation->creds = creds; + implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + flb_http_client_destroy(c); + + return 0; +} + +/* + * Parse IoT credentials response. + * AWS IoT credentials endpoint returns a JSON response with credentials. + * The format may be different from standard AWS credentials endpoints. + */ +static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, size_t response_len, time_t *expiration) +{ + jsmntok_t *tokens = NULL; + const jsmntok_t *t = NULL; + char *current_token = NULL; + jsmn_parser parser; + int tokens_size = 50; + size_t size; + int ret; + struct flb_aws_credentials *creds = NULL; + int i = 0; + int len; + flb_sds_t tmp; + + /* + * Remove/reset existing value of expiration. + * Expiration should be in the response, but it is not + * strictly speaking needed. Fluent Bit logs a warning if it is missing. + */ + *expiration = -1; + + jsmn_init(&parser); + + size = sizeof(jsmntok_t) * tokens_size; + tokens = flb_calloc(1, size); + if (!tokens) { + goto error; + } + + ret = jsmn_parse(&parser, response, response_len, tokens, tokens_size); + + if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) { + flb_error("[aws_credentials] Could not parse IoT credentials response - invalid JSON."); + goto error; + } + + /* Shouldn't happen, but just in case, check for too many tokens error */ + if (ret == JSMN_ERROR_NOMEM) { + flb_error("[aws_credentials] Could not parse IoT credentials response - response contained more tokens than expected."); + goto error; + } + + /* return value is number of tokens parsed */ + tokens_size = ret; + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + goto error; + } + + /* + * jsmn will create an array of tokens like: + * key, value, key, value + * For IoT credentials, the structure is: + * {"credentials": {"accessKeyId": "...", "secretAccessKey": "...", ...}} + */ + while (i < (tokens_size - 1)) { + t = &tokens[i]; + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { + break; + } + + if (t->type == JSMN_STRING) { + current_token = &response[t->start]; + len = t->end - t->start; + + /* Check for credentials wrapper object */ + if (strncmp(current_token, "credentials", len) == 0) { + /* Skip the credentials object - we'll process its contents */ + i++; + continue; + } + + /* Check for AccessKeyId field (case insensitive) */ + if (strncmp(current_token, "accessKeyId", len) == 0 || + strncmp(current_token, "AccessKeyId", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->access_key_id != NULL) { + flb_error("[aws_credentials] Trying to double allocate access_key_id"); + goto error; + } + creds->access_key_id = flb_sds_create_len(current_token, len); + if (!creds->access_key_id) { + flb_errno(); + goto error; + } + continue; + } + /* Check for SecretAccessKey field (case insensitive) */ + if (strncmp(current_token, "secretAccessKey", len) == 0 || + strncmp(current_token, "SecretAccessKey", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->secret_access_key != NULL) { + flb_error("[aws_credentials] Trying to double allocate secret_access_key"); + goto error; + } + creds->secret_access_key = flb_sds_create_len(current_token, len); + if (!creds->secret_access_key) { + flb_errno(); + goto error; + } + continue; + } + /* Check for Token field (session token) - case insensitive */ + if (strncmp(current_token, "sessionToken", len) == 0 || + strncmp(current_token, "Token", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->session_token != NULL) { + flb_error("[aws_credentials] Trying to double allocate session_token"); + goto error; + } + creds->session_token = flb_sds_create_len(current_token, len); + if (!creds->session_token) { + flb_errno(); + goto error; + } + continue; + } + /* Check for Expiration field (case insensitive) */ + if (strncmp(current_token, "expiration", len) == 0 || + strncmp(current_token, "Expiration", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + tmp = flb_sds_create_len(current_token, len); + if (!tmp) { + flb_errno(); + goto error; + } + *expiration = flb_aws_cred_expiration(tmp); + if (*expiration < 0) { + flb_warn("[aws_credentials] '%s' was invalid or could not be parsed. Disabling auto-refresh of credentials.", tmp); + } + flb_sds_destroy(tmp); + } + } + + i++; + } + + if (creds->access_key_id == NULL) { + flb_error("[aws_credentials] Missing AccessKeyId field in IoT credentials response"); + goto error; + } + + if (creds->secret_access_key == NULL) { + flb_error("[aws_credentials] Missing SecretAccessKey field in IoT credentials response"); + goto error; + } + + flb_debug("[aws_credentials] Successfully parsed IoT credentials - AccessKeyId: %s, Expiration: %ld", + creds->access_key_id, *expiration); + + flb_free(tokens); + return creds; + +error: + flb_aws_credentials_destroy(creds); + flb_free(tokens); + return NULL; +} diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index dd76c16faee..91fa07eb21a 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -234,6 +234,7 @@ if(FLB_IN_LIB) # These plugins work only on Linux if(NOT FLB_SYSTEM_WINDOWS) FLB_RT_TEST(FLB_OUT_FILE "out_file.c") + FLB_RT_TEST(FLB_OUT_LOGROTATE "out_logrotate.c") endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") diff --git a/tests/runtime/out_logrotate.c b/tests/runtime/out_logrotate.c new file mode 100644 index 00000000000..6b43a1f5860 --- /dev/null +++ b/tests/runtime/out_logrotate.c @@ -0,0 +1,1333 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include "flb_tests_runtime.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Test data */ +#include "data/common/json_invalid.h" /* JSON_INVALID */ +#include "data/common/json_long.h" /* JSON_LONG */ +#include "data/common/json_small.h" /* JSON_SMALL */ + +/* Test functions */ +void flb_test_logrotate_basic_rotation(void); +void flb_test_logrotate_gzip_compression(void); +void flb_test_logrotate_gzip_compression_exact_chunk(void); +void flb_test_logrotate_max_files_cleanup(void); +void flb_test_logrotate_max_files_validation(void); +void flb_test_logrotate_format_csv(void); +void flb_test_logrotate_format_ltsv(void); +void flb_test_logrotate_format_plain(void); +void flb_test_logrotate_format_msgpack(void); +void flb_test_logrotate_format_template(void); +void flb_test_logrotate_path(void); +void flb_test_logrotate_mkdir(void); +void flb_test_logrotate_delimiter(void); +void flb_test_logrotate_label_delimiter(void); +void flb_test_logrotate_csv_column_names(void); +void flb_test_logrotate_multithreaded(void); + +/* Test list */ +TEST_LIST = {{"basic_rotation", flb_test_logrotate_basic_rotation}, + {"gzip_compression", flb_test_logrotate_gzip_compression}, + {"gzip_compression_exact_chunk", + flb_test_logrotate_gzip_compression_exact_chunk}, + {"max_files_cleanup", flb_test_logrotate_max_files_cleanup}, + {"max_files_validation", flb_test_logrotate_max_files_validation}, + + {"format_csv", flb_test_logrotate_format_csv}, + {"format_ltsv", flb_test_logrotate_format_ltsv}, + {"format_plain", flb_test_logrotate_format_plain}, + {"format_msgpack", flb_test_logrotate_format_msgpack}, + {"format_template", flb_test_logrotate_format_template}, + {"path", flb_test_logrotate_path}, + {"mkdir", flb_test_logrotate_mkdir}, + {"delimiter", flb_test_logrotate_delimiter}, + {"label_delimiter", flb_test_logrotate_label_delimiter}, + {"csv_column_names", flb_test_logrotate_csv_column_names}, + {"multithreaded", flb_test_logrotate_multithreaded}, + {NULL, NULL}}; + +#define TEST_LOGFILE "flb_test_logrotate.log" +#define TEST_LOGPATH "out_logrotate" +#define TEST_TIMEOUT 10 + +/* Helper function to recursively delete directory and all its contents */ +static int recursive_delete_directory(const char *dir_path) { + DIR *dir; + struct dirent *entry; + struct stat statbuf; + char path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Check if directory exists */ + if (stat(dir_path, &statbuf) != 0) { + /* Directory doesn't exist, consider it success */ + return 0; + } + + /* Check if it's actually a directory */ + if (!S_ISDIR(statbuf.st_mode)) { + /* Not a directory, try to remove as file */ + return remove(dir_path); + } + + /* Open directory */ + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + /* Iterate through directory entries */ + while ((entry = readdir(dir)) != NULL) { + /* Skip . and .. */ + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(path, sizeof(path), "%s/%s", dir_path, entry->d_name); + + /* Get file status */ + if (stat(path, &statbuf) != 0) { + continue; + } + + /* Recursively delete subdirectories */ + if (S_ISDIR(statbuf.st_mode)) { + if (recursive_delete_directory(path) != 0) { + ret = -1; + } + } else { + /* Delete file */ + if (unlink(path) != 0) { + ret = -1; + } + } + } + + closedir(dir); + + /* Remove the directory itself */ + if (rmdir(dir_path) != 0) { + ret = -1; + } + + return ret; +} + +/* Helper function to count files in directory */ +static int count_files_in_directory(const char *dir_path, const char *prefix) { + DIR *dir; + struct dirent *entry; + int count = 0; + + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, strlen(prefix)) == 0) { + count++; + } + } + + closedir(dir); + return count; +} + +/* + * Helper function: Wait for a file matching the pattern "prefix*gz" to appear + * in dir_path + */ +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) { + int elapsed_time, found = 0; + DIR *dir; + struct dirent *entry; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; elapsed_time++) { + dir = opendir(dir_path); + if (dir) { + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, prefix_len) == 0 && + strlen(entry->d_name) > prefix_len + suffix_len && + strcmp(entry->d_name + strlen(entry->d_name) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } + closedir(dir); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} + +/* Helper function to read file content into buffer */ +static char *read_file_content(const char *filename, size_t *out_size) { + FILE *fp; + char *buffer; + struct stat st; + size_t size; + + if (stat(filename, &st) != 0) { + return NULL; + } + + size = st.st_size; + fp = fopen(filename, "rb"); + if (!fp) { + return NULL; + } + + buffer = flb_malloc(size + 1); + if (!buffer) { + fclose(fp); + return NULL; + } + + if (fread(buffer, 1, size, fp) != size) { + flb_free(buffer); + fclose(fp); + return NULL; + } + + buffer[size] = '\0'; + fclose(fp); + *out_size = size; + return buffer; +} + +/* Format Tests */ +void flb_test_logrotate_format_csv(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_csv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV format - should contain commas as delimiters */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* CSV should contain commas */ + TEST_CHECK(strstr(content, ",") != NULL); + /* CSV should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_ltsv(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_ltsv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify LTSV format - should contain colons (label delimiter) and tabs */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* LTSV should contain colons for label:value pairs */ + TEST_CHECK(strstr(content, ":") != NULL); + /* Should contain "time" label */ + TEST_CHECK(strstr(content, "time") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_plain(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_plain.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "plain", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify plain format - should be JSON without tag/timestamp prefix */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Plain format should contain JSON */ + TEST_CHECK(strstr(content, "{") != NULL); + /* Should not contain tag prefix like "test: [" */ + TEST_CHECK(strstr(content, "test: [") == NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_msgpack(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + struct stat st; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_msgpack.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "msgpack", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify msgpack format - should be binary data */ + if (stat(logfile, &st) == 0) { + TEST_CHECK(st.st_size > 0); + /* Msgpack files should not be readable as text (no newlines in first bytes) + */ + fp = fopen(logfile, "rb"); + if (fp) { + unsigned char first_bytes[10]; + size_t read_bytes = fread(first_bytes, 1, 10, fp); + fclose(fp); + if (read_bytes > 0) { + /* + * Msgpack typically starts with array markers (0x91, 0x92, etc.) + * or map markers. Just verify it's not plain text JSON. + */ + TEST_CHECK(first_bytes[0] != '{' && first_bytes[0] != '['); + } + } + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_template(void) { + int i; + int ret; + int bytes; + /* Use JSON with specific fields for template testing */ + const char *json_template = + "[1448403340, {\"message\": \"test log entry\", \"level\": \"info\"}]"; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, + "test_template.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", + "{time} {message}", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = + flb_lib_push(ctx, in_ffd, (char *)json_template, strlen(json_template)); + TEST_CHECK(bytes == strlen(json_template)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify template format - should contain substituted values */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Template should contain the message value */ + TEST_CHECK(strstr(content, "test log entry") != NULL); + /* Should contain timestamp (as float) */ + TEST_CHECK(strstr(content, "1448403340") != NULL || + strstr(content, ".") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Configuration Option Tests */ +void flb_test_logrotate_path(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char test_path[PATH_MAX]; + + snprintf(test_path, sizeof(test_path), "%s/path_test", TEST_LOGPATH); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); +/* Construct logfile path - test_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s/path_test.log", test_path); +#pragma GCC diagnostic pop + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "path", test_path, + "file", "path_test.log", "mkdir", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify file was created in the specified path */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_mkdir(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char nested_path[PATH_MAX]; + struct stat st; + + snprintf(nested_path, sizeof(nested_path), "%s/nested/deep/path", + TEST_LOGPATH); +/* Construct logfile path - nested_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s/test_mkdir.log", nested_path); +#pragma GCC diagnostic pop + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "mkdir", "true", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify nested directory was created */ + TEST_CHECK(stat(nested_path, &st) == 0); + TEST_CHECK(S_ISDIR(st.st_mode)); + + /* Verify file was created */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_delimiter(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, + "test_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "delimiter", "tab", "max_size", + "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify tab delimiter is used (should contain tabs, not commas) */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tab characters */ + int has_tab = 0; + int j; + for (j = 0; j < content_size; j++) { + if (content[j] == '\t') { + has_tab = 1; + break; + } + } + TEST_CHECK(has_tab); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_label_delimiter(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, + "test_label_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "label_delimiter", "comma", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify custom label delimiter is used */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain "," as label delimiter (comma) */ + TEST_CHECK(strstr(content, ",") != NULL); + /* Should contain "time" label with comma delimiter */ + /* LTSV format prints "time" (with quotes) followed by delimiter */ + TEST_CHECK(strstr(content, "\"time\",") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_csv_column_names(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, + "test_csv_columns.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "csv_column_names", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV column names header exists */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* First line should contain "timestamp" */ + TEST_CHECK(strstr(content, "timestamp") != NULL); + /* Should contain key names from JSON */ + TEST_CHECK(strstr(content, "key_0") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Multithreaded Test */ +struct thread_data { + flb_ctx_t *ctx; + int in_ffd; + int thread_id; + int events_per_thread; + char *json_data; + size_t json_len; + int *success; + pthread_mutex_t *mutex; +}; + +static void *thread_worker(void *arg) { + struct thread_data *data = (struct thread_data *)arg; + int i; + int bytes; + + for (i = 0; i < data->events_per_thread; i++) { + bytes = + flb_lib_push(data->ctx, data->in_ffd, data->json_data, data->json_len); + if (bytes != (int)data->json_len) { + pthread_mutex_lock(data->mutex); + *data->success = 0; + pthread_mutex_unlock(data->mutex); + return NULL; + } + /* Small delay to allow interleaving */ + flb_time_msleep(10); + } + + return NULL; +} + +void flb_test_logrotate_multithreaded(void) { + int ret; + int i; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + pthread_t threads[8]; + struct thread_data thread_data[8]; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + int success = 1; + int num_threads = 4; + int events_per_thread = 10; + FILE *fp; + char *content; + size_t content_size; + int line_count = 0; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, + "test_multithreaded.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "0.5", "Grace", "2", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "1M", "max_files", "5", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare thread data */ + for (i = 0; i < num_threads; i++) { + thread_data[i].ctx = ctx; + thread_data[i].in_ffd = in_ffd; + thread_data[i].thread_id = i; + thread_data[i].events_per_thread = events_per_thread; + thread_data[i].json_data = p; + thread_data[i].json_len = strlen(p); + thread_data[i].success = &success; + thread_data[i].mutex = &mutex; + } + + /* Create and start threads */ + for (i = 0; i < num_threads; i++) { + ret = pthread_create(&threads[i], NULL, thread_worker, &thread_data[i]); + TEST_CHECK(ret == 0); + } + + /* Wait for all threads to complete */ + for (i = 0; i < num_threads; i++) { + pthread_join(threads[i], NULL); + } + + /* Wait for flush to complete - allow multiple flush cycles */ + flb_time_msleep(3000); + + /* Wait for file to exist and have content before stopping */ + ret = wait_for_file(logfile, 1000, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify all data was written correctly */ + TEST_CHECK(success == 1); + + /* Verify file exists and has content */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + char line[4096]; + while (fgets(line, sizeof(line), fp) != NULL) { + line_count++; + } + fclose(fp); + } + + /* Should have at least num_threads * events_per_thread records */ + /* (may be more due to JSON format adding tag prefix) */ + TEST_CHECK(line_count >= num_threads * events_per_thread); + + /* Verify file content is valid - read and check for expected data */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tag */ + TEST_CHECK(strstr(content, "test") != NULL); + /* Should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + /* Count occurrences of key_0 to verify records */ + int key_count = 0; + char *pos = content; + while ((pos = strstr(pos, "key_0")) != NULL) { + key_count++; + pos++; + } + TEST_CHECK(key_count >= num_threads * events_per_thread); + flb_free(content); + } + + pthread_mutex_destroy(&mutex); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_basic_rotation(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + time_t now = time(NULL); + struct tm *tm_info = localtime(&now); + char timestamp[32]; + + strftime(timestamp, sizeof(timestamp), "%Y%m%d_%H%M%S", tm_info); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to fill the file (JSON_SMALL is ~4KB, 4 events = ~16KB) + */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Wait a bit more to ensure flush completes and file size is updated */ + flb_time_msleep(1500); + + /* Write additional data to trigger rotation (4 more events = ~16KB more) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that the original file exists */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp != NULL) { + fclose(fp); + } + + /* Check that at least one rotated file exists: "flb_test_logrotate.log.*" */ + TEST_CHECK( + count_files_in_directory(TEST_LOGPATH, "flb_test_logrotate.log.") >= 1); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_gzip_compression(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", "true", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger rotation (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists: "flb_test_logrotate.log.*.gz" */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_logrotate.log.", ".gz", + TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_max_files_cleanup(void) { + int i, j; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int file_count; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger multiple rotations */ + for (i = 0; i < 5; i++) { /* Write ~5MB to trigger multiple rotations */ + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (j = 0; j < 4; j++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= 4); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that only Max_Files + 1 files exist (current + rotated) */ + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= + 4); /* Current file + 3 rotated files (max_files=3) */ + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_max_files_validation(void) { + flb_ctx_t *ctx; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + /* Test with max_files = 0 */ + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_files", "0", NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Test with max_files = -1 */ + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_files", "-1", NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Clean up directory */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_gzip_compression_exact_chunk(void) { + int ret; + int bytes; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *large_message; + char *json_payload; + size_t msg_size = 64 * 1024; /* 64KB exact chunk size */ + size_t json_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", "{message}", + "max_size", "64K", "max_files", "3", "gzip", + "true", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare 64KB message */ + large_message = flb_malloc(msg_size + 1); + TEST_CHECK(large_message != NULL); + memset(large_message, 'A', msg_size); + large_message[msg_size] = '\0'; + + /* Create JSON payload: [timestamp, {"message": "..."}] */ + /* Estimate size: msg_size + overhead */ + json_size = msg_size + 100; + json_payload = flb_malloc(json_size); + TEST_CHECK(json_payload != NULL); + + snprintf(json_payload, json_size, "[%lu, {\"message\": \"%s\"}]", time(NULL), + large_message); + + /* Write exactly 64KB of data (the message content) */ + bytes = flb_lib_push(ctx, in_ffd, json_payload, strlen(json_payload)); + TEST_CHECK(bytes == strlen(json_payload)); + + flb_free(large_message); + flb_free(json_payload); + + /* Wait for flush and file creation */ + flb_time_msleep(1500); + + /* Trigger rotation by writing one more small record */ + char *small_payload = "[1234567890, {\"message\": \"trigger\"}]"; + bytes = flb_lib_push(ctx, in_ffd, small_payload, strlen(small_payload)); + TEST_CHECK(bytes == strlen(small_payload)); + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_logrotate.log.", ".gz", + TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} \ No newline at end of file