diff --git a/plugins/filter_kubernetes/kube_conf.c b/plugins/filter_kubernetes/kube_conf.c index 73e73495e92..60277e786c1 100644 --- a/plugins/filter_kubernetes/kube_conf.c +++ b/plugins/filter_kubernetes/kube_conf.c @@ -53,6 +53,7 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, } ctx->config = config; ctx->ins = ins; + mk_list_init(&ctx->refresh_pods); /* Set config_map properties in our local context */ ret = flb_filter_config_map_set(ins, (void *) ctx); @@ -207,6 +208,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) return; } + /* Stop background refresh timer and release tracked pods */ + flb_kube_meta_refresh_destroy(ctx); + if (ctx->hash_table) { flb_hash_table_destroy(ctx->hash_table); } diff --git a/plugins/filter_kubernetes/kube_conf.h b/plugins/filter_kubernetes/kube_conf.h index b59ed009f28..107eebdb7ec 100644 --- a/plugins/filter_kubernetes/kube_conf.h +++ b/plugins/filter_kubernetes/kube_conf.h @@ -98,6 +98,8 @@ struct service_attributes { }; +struct flb_sched_timer; + /* Filter context */ struct flb_kube { /* Configuration parameters */ @@ -196,6 +198,7 @@ struct flb_kube { int kube_meta_cache_ttl; int kube_meta_namespace_cache_ttl; + int kube_meta_cache_refresh_interval; /* Configuration used for enabling pod to service name mapping*/ int aws_use_pod_association; @@ -241,6 +244,17 @@ struct flb_kube { struct flb_upstream *kubelet_upstream; struct flb_upstream *kube_api_upstream; struct flb_filter_instance *ins; + + /* + * Optional background refresh of cached pod metadata. When + * kube_meta_cache_refresh_interval > 0, a periodic timer re-fetches + * metadata for the pods tracked in 'refresh_pods' and updates the cache + * entries in place, so in-place label/annotation changes on long-lived, + * stable-named pods (e.g. StatefulSet members) are eventually reflected. + * Disabled by default (interval == 0). + */ + struct mk_list refresh_pods; + struct flb_sched_timer *refresh_timer; }; struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *i, diff --git a/plugins/filter_kubernetes/kube_meta.c b/plugins/filter_kubernetes/kube_meta.c index df21a9270d0..16317d0d59f 100644 --- a/plugins/filter_kubernetes/kube_meta.c +++ b/plugins/filter_kubernetes/kube_meta.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -2018,6 +2019,212 @@ static int get_and_merge_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *me return ret; } +/* + * Background metadata refresh (opt-in via kube_meta_cache_refresh_interval). + * + * When enabled, every pod that gets cached is tracked in ctx->refresh_pods with + * a private copy of its 'meta'. A periodic timer walks that list and, for each + * pod still present in the cache, re-fetches its metadata and replaces the cache + * entry in place. Because the cache only ever holds pods this instance is + * logging (node-local), the extra request load stays bounded to this node. + */ + +struct kube_refresh_pod { + struct flb_kube_meta meta; /* private deep copy, owns its strings */ + struct mk_list _head; +}; + +static int kube_dup_field(char **dst, int *dst_len, + const char *src, int src_len) +{ + if (src == NULL) { + *dst = NULL; + *dst_len = 0; + return 0; + } + + *dst = flb_malloc(src_len + 1); + if (*dst == NULL) { + flb_errno(); + return -1; + } + memcpy(*dst, src, src_len); + (*dst)[src_len] = '\0'; + *dst_len = src_len; + return 0; +} + +static int kube_meta_copy(struct flb_kube_meta *dst, + const struct flb_kube_meta *src) +{ + memset(dst, 0, sizeof(*dst)); + dst->fields = src->fields; + + if (kube_dup_field(&dst->cluster, &dst->cluster_len, + src->cluster, src->cluster_len) != 0 || + kube_dup_field(&dst->namespace, &dst->namespace_len, + src->namespace, src->namespace_len) != 0 || + kube_dup_field(&dst->podname, &dst->podname_len, + src->podname, src->podname_len) != 0 || + kube_dup_field(&dst->container_name, &dst->container_name_len, + src->container_name, src->container_name_len) != 0 || + kube_dup_field(&dst->container_image, &dst->container_image_len, + src->container_image, src->container_image_len) != 0 || + kube_dup_field(&dst->docker_id, &dst->docker_id_len, + src->docker_id, src->docker_id_len) != 0 || + kube_dup_field(&dst->container_hash, &dst->container_hash_len, + src->container_hash, src->container_hash_len) != 0 || + kube_dup_field(&dst->workload, &dst->workload_len, + src->workload, src->workload_len) != 0 || + kube_dup_field(&dst->cache_key, &dst->cache_key_len, + src->cache_key, src->cache_key_len) != 0) { + flb_kube_meta_release(dst); + return -1; + } + + return 0; +} + +/* Track a pod for background refresh (no-op if disabled or already tracked) */ +static void kube_meta_refresh_track(struct flb_kube *ctx, + struct flb_kube_meta *meta) +{ + struct mk_list *head; + struct kube_refresh_pod *rp; + + if (ctx->kube_meta_cache_refresh_interval <= 0 || meta->cache_key == NULL) { + return; + } + + /* Skip if this pod is already tracked */ + mk_list_foreach(head, &ctx->refresh_pods) { + rp = mk_list_entry(head, struct kube_refresh_pod, _head); + if (rp->meta.cache_key_len == meta->cache_key_len && + memcmp(rp->meta.cache_key, meta->cache_key, + meta->cache_key_len) == 0) { + return; + } + } + + rp = flb_calloc(1, sizeof(struct kube_refresh_pod)); + if (rp == NULL) { + flb_errno(); + return; + } + + if (kube_meta_copy(&rp->meta, meta) != 0) { + flb_free(rp); + return; + } + + mk_list_add(&rp->_head, &ctx->refresh_pods); +} + +/* + * Periodic timer callback: refresh cached pod metadata in place. Pods no longer + * present in the cache (evicted or stopped logging) are dropped so the tracking + * list stays bounded to what is actively cached. + */ +static void cb_kube_meta_refresh(struct flb_config *config, void *data) +{ + int ret; + char *buf; + size_t size; + void *cached; + size_t cached_size; + int refreshed = 0; + struct mk_list *head; + struct mk_list *tmp; + struct kube_refresh_pod *rp; + struct flb_kube *ctx = data; + + (void) config; + + mk_list_foreach_safe(head, tmp, &ctx->refresh_pods) { + rp = mk_list_entry(head, struct kube_refresh_pod, _head); + + /* Stop tracking pods that are no longer in the cache */ + ret = flb_hash_table_get(ctx->hash_table, + rp->meta.cache_key, rp->meta.cache_key_len, + &cached, &cached_size); + if (ret == -1) { + mk_list_del(&rp->_head); + flb_kube_meta_release(&rp->meta); + flb_free(rp); + continue; + } + + /* Re-fetch metadata and update the cache entry in place */ + buf = NULL; + size = 0; + ret = get_and_merge_pod_meta(ctx, &rp->meta, &buf, &size); + if (ret != -1 && buf != NULL) { + flb_hash_table_add(ctx->hash_table, + rp->meta.cache_key, rp->meta.cache_key_len, + buf, size); + flb_free(buf); + refreshed++; + } + } + + if (refreshed > 0) { + flb_plg_debug(ctx->ins, "refreshed metadata for %d cached pod(s)", + refreshed); + } +} + +/* Create the periodic refresh timer (runs on the event-loop thread) */ +int flb_kube_meta_refresh_start(struct flb_kube *ctx) +{ + int ret; + struct flb_sched *sched; + + if (ctx->kube_meta_cache_refresh_interval <= 0 || + ctx->refresh_timer != NULL) { + return 0; + } + + sched = flb_sched_ctx_get(); + if (sched == NULL) { + flb_plg_error(ctx->ins, "could not get scheduler for metadata refresh"); + return -1; + } + + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->kube_meta_cache_refresh_interval * 1000, + cb_kube_meta_refresh, ctx, + &ctx->refresh_timer); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not create metadata refresh timer"); + return -1; + } + + flb_plg_info(ctx->ins, + "background pod metadata refresh enabled (interval=%is)", + ctx->kube_meta_cache_refresh_interval); + return 0; +} + +/* Destroy the refresh timer and release all tracked pods */ +void flb_kube_meta_refresh_destroy(struct flb_kube *ctx) +{ + struct mk_list *head; + struct mk_list *tmp; + struct kube_refresh_pod *rp; + + if (ctx->refresh_timer != NULL) { + flb_sched_timer_cb_destroy(ctx->refresh_timer); + ctx->refresh_timer = NULL; + } + + mk_list_foreach_safe(head, tmp, &ctx->refresh_pods) { + rp = mk_list_entry(head, struct kube_refresh_pod, _head); + mk_list_del(&rp->_head); + flb_kube_meta_release(&rp->meta); + flb_free(rp); + } +} + /* * Work around kubernetes/kubernetes/issues/78479 by waiting * for DNS to start up. @@ -2362,6 +2569,9 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx, flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key, &hash_meta_buf, &hash_meta_size); } + + /* Track this pod for background metadata refresh (if enabled) */ + kube_meta_refresh_track(ctx, meta); } /* diff --git a/plugins/filter_kubernetes/kube_meta.h b/plugins/filter_kubernetes/kube_meta.h index c5517ce919e..7f7c95aa727 100644 --- a/plugins/filter_kubernetes/kube_meta.h +++ b/plugins/filter_kubernetes/kube_meta.h @@ -78,6 +78,8 @@ int flb_kube_meta_get(struct flb_kube *ctx, struct flb_kube_props *props, struct flb_kube_meta *namespace_meta); int flb_kube_meta_release(struct flb_kube_meta *meta); +int flb_kube_meta_refresh_start(struct flb_kube *ctx); +void flb_kube_meta_refresh_destroy(struct flb_kube *ctx); int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config); int get_api_server_configmap(struct flb_kube *ctx, const char *namespace, const char *configmap, char **out_buf, size_t *out_size); diff --git a/plugins/filter_kubernetes/kubernetes.c b/plugins/filter_kubernetes/kubernetes.c index 83190a6b9f5..453ef86e48e 100644 --- a/plugins/filter_kubernetes/kubernetes.c +++ b/plugins/filter_kubernetes/kubernetes.c @@ -619,6 +619,14 @@ static int cb_kube_filter(const void *data, size_t bytes, (void) i_ins; (void) config; + /* + * Lazily create the background metadata refresh timer. This runs from the + * filter callback so the event-loop scheduler is available. + */ + if (ctx->kube_meta_cache_refresh_interval > 0 && ctx->refresh_timer == NULL) { + flb_kube_meta_refresh_start(ctx); + } + if (ctx->use_journal == FLB_FALSE || ctx->dummy_meta == FLB_TRUE) { if (ctx->dummy_meta == FLB_TRUE) { ret = flb_kube_dummy_meta_get(&dummy_cache_buf, &cache_size); @@ -1129,6 +1137,18 @@ static struct flb_config_map config_map[] = { "Setting this to 0 will disable the cache TTL and " "will evict entries once the cache reaches capacity." }, + { + FLB_CONFIG_MAP_TIME, "kube_meta_cache_refresh_interval", "0", + 0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_cache_refresh_interval), + "interval to proactively refresh cached pod metadata in the background. " + "By default it is set to 0 which disables the feature and preserves the " + "existing behavior. When set to a time interval, for example 60 or 60s, a " + "periodic timer re-fetches metadata for the pods currently in the cache and " + "updates the entries in place, so in-place label and annotation changes on " + "long-lived, stable-named pods (such as StatefulSet members) are eventually " + "reflected without needing a restart. The refresh only touches pods already " + "cached on this node, so the added request load stays bounded." + }, /* * Enable pod to service name association logics diff --git a/src/flb_hash_table.c b/src/flb_hash_table.c index ee0078057dd..1aeeefa91d2 100644 --- a/src/flb_hash_table.c +++ b/src/flb_hash_table.c @@ -411,7 +411,27 @@ int flb_hash_table_add(struct flb_hash_table *ht, const char *key, int key_len, return -1; } - /* Check capacity */ + /* + * Check if this is a replacement first. Replacing the value of an existing + * key does not change the number of entries, so it must be handled before + * the capacity check below; otherwise a replacement on a full table would + * needlessly evict an unrelated entry. + */ + entry = hash_get_entry(ht, key, key_len, &id); + if (entry) { + /* + * The key already exists, just perform a value replacement, check if the + * value refers to our own previous allocation. + */ + ret = entry_set_value(entry, val, val_size); + if (ret == -1) { + return -1; + } + + return id; + } + + /* Check capacity (only required when inserting a new entry) */ if (ht->max_entries > 0 && ht->total_count >= ht->max_entries) { if (ht->evict_mode == FLB_HASH_TABLE_EVICT_NONE) { /* Do nothing */ @@ -427,21 +447,6 @@ int flb_hash_table_add(struct flb_hash_table *ht, const char *key, int key_len, } } - /* Check if this is a replacement */ - entry = hash_get_entry(ht, key, key_len, &id); - if (entry) { - /* - * The key already exists, just perform a value replacement, check if the - * value refers to our own previous allocation. - */ - ret = entry_set_value(entry, val, val_size); - if (ret == -1) { - return -1; - } - - return id; - } - /* * Below is just code to handle the creation of a new entry in the table */ diff --git a/tests/runtime/filter_kubernetes.c b/tests/runtime/filter_kubernetes.c index a0a97b4a7b3..10ae5ee5217 100644 --- a/tests/runtime/filter_kubernetes.c +++ b/tests/runtime/filter_kubernetes.c @@ -1029,7 +1029,106 @@ static void flb_test_systemd_logs() } #endif +/* + * Exercise kube_meta_cache_refresh_interval end to end without any network: + * with use_tag_for_meta the metadata is derived from the record tag, so a pod + * is cached (and tracked for refresh) on the first record, the background timer + * re-derives and updates the cache entry in place, and the second record is + * still enriched. This validates the track -> timer -> refresh -> cleanup path. + */ +static int refresh_enriched = 0; + +static int cb_refresh_enriched(void *record, size_t size, void *data) +{ + (void) data; + if (record != NULL && strstr(record, "refreshpod") != NULL) { + refresh_enriched++; + } + if (size > 0) { + flb_free(record); + } + return 0; +} + +static void flb_test_kube_meta_cache_refresh(void) +{ + int i; + int ret; + int in_ffd; + int filter_ffd; + int out_ffd; + flb_ctx_t *flb; + struct flb_lib_out_cb cb_data; + const char *rec = "[0, {\"log\":\"hello\"}]"; + + refresh_enriched = 0; + + flb = flb_create(); + TEST_CHECK(flb != NULL); + if (!flb) { + return; + } + + ret = flb_service_set(flb, + "Flush", "1", + "Grace", "1", + "Log_Level", "error", + "Parsers_File", DPATH "/parsers.conf", + NULL); + TEST_CHECK(ret == 0); + + in_ffd = flb_input(flb, "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(flb, in_ffd, "Tag", "kube.default.refreshpod.con", NULL); + + filter_ffd = flb_filter(flb, "kubernetes", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(flb, filter_ffd, + "Match", "kube.*", + "Use_Tag_For_Meta", "On", + "Regex_Parser", "kubernetes-tag", + "Kube_Tag_Prefix", "kube.", + "kube_meta_cache_refresh_interval", "1", + NULL); + TEST_CHECK(ret == 0); + + cb_data.cb = cb_refresh_enriched; + cb_data.data = NULL; + out_ffd = flb_output(flb, "lib", (void *) &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(flb, out_ffd, "Match", "kube.*", "format", "json", NULL); + + ret = flb_start(flb); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_destroy(flb); + return; + } + + /* First record: cache miss -> pod tracked for refresh -> enriched */ + flb_lib_push(flb, in_ffd, rec, strlen(rec)); + for (i = 0; i < 30 && refresh_enriched < 1; i++) { + flb_time_msleep(100); + } + TEST_CHECK_(refresh_enriched >= 1, "first record should be enriched"); + + /* Let the background refresh timer fire a couple of times (interval = 1s) */ + flb_time_msleep(2500); + + /* Second record: cache hit (entry refreshed in place) -> still enriched */ + flb_lib_push(flb, in_ffd, rec, strlen(rec)); + for (i = 0; i < 30 && refresh_enriched < 2; i++) { + flb_time_msleep(100); + } + TEST_CHECK_(refresh_enriched >= 2, + "second record should be enriched after refresh"); + + flb_stop(flb); + flb_destroy(flb); +} + TEST_LIST = { + {"kube_meta_cache_refresh", flb_test_kube_meta_cache_refresh}, {"kube_core_base", flb_test_core_base}, {"kube_core_no_meta", flb_test_core_no_meta}, {"kube_core_unescaping_text", flb_test_core_unescaping_text},