Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/filter_kubernetes/kube_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
}
ctx->config = config;
ctx->ins = ins;
mk_list_init(&ctx->refresh_pods);

/* Set config_map properties in our local context */
ret = flb_filter_config_map_set(ins, (void *) ctx);
Expand Down Expand Up @@ -207,6 +208,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
return;
}

/* Stop background refresh timer and release tracked pods */
flb_kube_meta_refresh_destroy(ctx);

if (ctx->hash_table) {
flb_hash_table_destroy(ctx->hash_table);
}
Expand Down
14 changes: 14 additions & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct service_attributes {

};

struct flb_sched_timer;

/* Filter context */
struct flb_kube {
/* Configuration parameters */
Expand Down Expand Up @@ -196,6 +198,7 @@ struct flb_kube {

int kube_meta_cache_ttl;
int kube_meta_namespace_cache_ttl;
int kube_meta_cache_refresh_interval;

/* Configuration used for enabling pod to service name mapping*/
int aws_use_pod_association;
Expand Down Expand Up @@ -241,6 +244,17 @@ struct flb_kube {
struct flb_upstream *kubelet_upstream;
struct flb_upstream *kube_api_upstream;
struct flb_filter_instance *ins;

/*
* Optional background refresh of cached pod metadata. When
* kube_meta_cache_refresh_interval > 0, a periodic timer re-fetches
* metadata for the pods tracked in 'refresh_pods' and updates the cache
* entries in place, so in-place label/annotation changes on long-lived,
* stable-named pods (e.g. StatefulSet members) are eventually reflected.
* Disabled by default (interval == 0).
*/
struct mk_list refresh_pods;
struct flb_sched_timer *refresh_timer;
};

struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *i,
Expand Down
210 changes: 210 additions & 0 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_env.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/tls/flb_tls.h>

#include <sys/types.h>
Expand Down Expand Up @@ -2018,6 +2019,212 @@ static int get_and_merge_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *me
return ret;
}

/*
* Background metadata refresh (opt-in via kube_meta_cache_refresh_interval).
*
* When enabled, every pod that gets cached is tracked in ctx->refresh_pods with
* a private copy of its 'meta'. A periodic timer walks that list and, for each
* pod still present in the cache, re-fetches its metadata and replaces the cache
* entry in place. Because the cache only ever holds pods this instance is
* logging (node-local), the extra request load stays bounded to this node.
*/

struct kube_refresh_pod {
struct flb_kube_meta meta; /* private deep copy, owns its strings */
struct mk_list _head;
};

static int kube_dup_field(char **dst, int *dst_len,
const char *src, int src_len)
{
if (src == NULL) {
*dst = NULL;
*dst_len = 0;
return 0;
}

*dst = flb_malloc(src_len + 1);
if (*dst == NULL) {
flb_errno();
return -1;
}
memcpy(*dst, src, src_len);
(*dst)[src_len] = '\0';
*dst_len = src_len;
return 0;
}

static int kube_meta_copy(struct flb_kube_meta *dst,
const struct flb_kube_meta *src)
{
memset(dst, 0, sizeof(*dst));
dst->fields = src->fields;

if (kube_dup_field(&dst->cluster, &dst->cluster_len,
src->cluster, src->cluster_len) != 0 ||
kube_dup_field(&dst->namespace, &dst->namespace_len,
src->namespace, src->namespace_len) != 0 ||
kube_dup_field(&dst->podname, &dst->podname_len,
src->podname, src->podname_len) != 0 ||
kube_dup_field(&dst->container_name, &dst->container_name_len,
src->container_name, src->container_name_len) != 0 ||
kube_dup_field(&dst->container_image, &dst->container_image_len,
src->container_image, src->container_image_len) != 0 ||
kube_dup_field(&dst->docker_id, &dst->docker_id_len,
src->docker_id, src->docker_id_len) != 0 ||
kube_dup_field(&dst->container_hash, &dst->container_hash_len,
src->container_hash, src->container_hash_len) != 0 ||
kube_dup_field(&dst->workload, &dst->workload_len,
src->workload, src->workload_len) != 0 ||
kube_dup_field(&dst->cache_key, &dst->cache_key_len,
src->cache_key, src->cache_key_len) != 0) {
flb_kube_meta_release(dst);
return -1;
}

return 0;
}

/* Track a pod for background refresh (no-op if disabled or already tracked) */
static void kube_meta_refresh_track(struct flb_kube *ctx,
struct flb_kube_meta *meta)
{
struct mk_list *head;
struct kube_refresh_pod *rp;

if (ctx->kube_meta_cache_refresh_interval <= 0 || meta->cache_key == NULL) {
return;
}

/* Skip if this pod is already tracked */
mk_list_foreach(head, &ctx->refresh_pods) {
rp = mk_list_entry(head, struct kube_refresh_pod, _head);
if (rp->meta.cache_key_len == meta->cache_key_len &&
memcmp(rp->meta.cache_key, meta->cache_key,
meta->cache_key_len) == 0) {
return;
}
}

rp = flb_calloc(1, sizeof(struct kube_refresh_pod));
if (rp == NULL) {
flb_errno();
return;
}

if (kube_meta_copy(&rp->meta, meta) != 0) {
flb_free(rp);
return;
}

mk_list_add(&rp->_head, &ctx->refresh_pods);
}

/*
* Periodic timer callback: refresh cached pod metadata in place. Pods no longer
* present in the cache (evicted or stopped logging) are dropped so the tracking
* list stays bounded to what is actively cached.
*/
static void cb_kube_meta_refresh(struct flb_config *config, void *data)
{
int ret;
char *buf;
size_t size;
void *cached;
size_t cached_size;
int refreshed = 0;
struct mk_list *head;
struct mk_list *tmp;
struct kube_refresh_pod *rp;
struct flb_kube *ctx = data;

(void) config;

mk_list_foreach_safe(head, tmp, &ctx->refresh_pods) {
rp = mk_list_entry(head, struct kube_refresh_pod, _head);

/* Stop tracking pods that are no longer in the cache */
ret = flb_hash_table_get(ctx->hash_table,
rp->meta.cache_key, rp->meta.cache_key_len,
&cached, &cached_size);
if (ret == -1) {
mk_list_del(&rp->_head);
flb_kube_meta_release(&rp->meta);
flb_free(rp);
continue;
}

/* Re-fetch metadata and update the cache entry in place */
buf = NULL;
size = 0;
ret = get_and_merge_pod_meta(ctx, &rp->meta, &buf, &size);
if (ret != -1 && buf != NULL) {
flb_hash_table_add(ctx->hash_table,
rp->meta.cache_key, rp->meta.cache_key_len,
buf, size);
Comment on lines +2162 to +2164

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid evicting cache entries during refresh

When the pod metadata cache is full, this refresh path can evict unrelated entries even though it is only replacing an entry that was just confirmed to exist. flb_hash_table_add() checks capacity and evicts before it checks whether the key is a replacement (src/flb_hash_table.c:414-431), so with the default random-eviction cache a refresh cycle over 256 tracked pods can randomly drop other cached pods and shrink/churn the cache, causing avoidable API/kubelet misses for active pods. Use a replacement path that does not run capacity eviction for existing keys.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b74ecc2. Reordered flb_hash_table_add() so a replacement is handled before the capacity check, so refresh no longer evicts unrelated entries. The hashtable internal suite (including the eviction tests) and the full filter_kubernetes suite pass, and valgrind is clean.

flb_free(buf);
refreshed++;
}
}

if (refreshed > 0) {
flb_plg_debug(ctx->ins, "refreshed metadata for %d cached pod(s)",
refreshed);
}
}

/* Create the periodic refresh timer (runs on the event-loop thread) */
int flb_kube_meta_refresh_start(struct flb_kube *ctx)
{
int ret;
struct flb_sched *sched;

if (ctx->kube_meta_cache_refresh_interval <= 0 ||
ctx->refresh_timer != NULL) {
return 0;
}

sched = flb_sched_ctx_get();
if (sched == NULL) {
flb_plg_error(ctx->ins, "could not get scheduler for metadata refresh");
return -1;
}

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->kube_meta_cache_refresh_interval * 1000,
cb_kube_meta_refresh, ctx,
&ctx->refresh_timer);
if (ret < 0) {
flb_plg_error(ctx->ins, "could not create metadata refresh timer");
return -1;
}

flb_plg_info(ctx->ins,
"background pod metadata refresh enabled (interval=%is)",
ctx->kube_meta_cache_refresh_interval);
return 0;
}

/* Destroy the refresh timer and release all tracked pods */
void flb_kube_meta_refresh_destroy(struct flb_kube *ctx)
{
struct mk_list *head;
struct mk_list *tmp;
struct kube_refresh_pod *rp;

if (ctx->refresh_timer != NULL) {
flb_sched_timer_cb_destroy(ctx->refresh_timer);
ctx->refresh_timer = NULL;
}

mk_list_foreach_safe(head, tmp, &ctx->refresh_pods) {
rp = mk_list_entry(head, struct kube_refresh_pod, _head);
mk_list_del(&rp->_head);
flb_kube_meta_release(&rp->meta);
flb_free(rp);
}
}

/*
* Work around kubernetes/kubernetes/issues/78479 by waiting
* for DNS to start up.
Expand Down Expand Up @@ -2362,6 +2569,9 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key,
&hash_meta_buf, &hash_meta_size);
}

/* Track this pod for background metadata refresh (if enabled) */
kube_meta_refresh_track(ctx, meta);
}

/*
Expand Down
2 changes: 2 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ int flb_kube_meta_get(struct flb_kube *ctx,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta);
int flb_kube_meta_release(struct flb_kube_meta *meta);
int flb_kube_meta_refresh_start(struct flb_kube *ctx);
void flb_kube_meta_refresh_destroy(struct flb_kube *ctx);
int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config);
int get_api_server_configmap(struct flb_kube *ctx, const char *namespace, const char *configmap, char **out_buf, size_t *out_size);

Expand Down
20 changes: 20 additions & 0 deletions plugins/filter_kubernetes/kubernetes.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,14 @@ static int cb_kube_filter(const void *data, size_t bytes,
(void) i_ins;
(void) config;

/*
* Lazily create the background metadata refresh timer. This runs from the
* filter callback so the event-loop scheduler is available.
*/
if (ctx->kube_meta_cache_refresh_interval > 0 && ctx->refresh_timer == NULL) {
flb_kube_meta_refresh_start(ctx);
}

if (ctx->use_journal == FLB_FALSE || ctx->dummy_meta == FLB_TRUE) {
if (ctx->dummy_meta == FLB_TRUE) {
ret = flb_kube_dummy_meta_get(&dummy_cache_buf, &cache_size);
Expand Down Expand Up @@ -1129,6 +1137,18 @@ static struct flb_config_map config_map[] = {
"Setting this to 0 will disable the cache TTL and "
"will evict entries once the cache reaches capacity."
},
{
FLB_CONFIG_MAP_TIME, "kube_meta_cache_refresh_interval", "0",
0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_cache_refresh_interval),
"interval to proactively refresh cached pod metadata in the background. "
"By default it is set to 0 which disables the feature and preserves the "
"existing behavior. When set to a time interval, for example 60 or 60s, a "
"periodic timer re-fetches metadata for the pods currently in the cache and "
"updates the entries in place, so in-place label and annotation changes on "
"long-lived, stable-named pods (such as StatefulSet members) are eventually "
"reflected without needing a restart. The refresh only touches pods already "
"cached on this node, so the added request load stays bounded."
},

/*
* Enable pod to service name association logics
Expand Down
37 changes: 21 additions & 16 deletions src/flb_hash_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,27 @@ int flb_hash_table_add(struct flb_hash_table *ht, const char *key, int key_len,
return -1;
}

/* Check capacity */
/*
* Check if this is a replacement first. Replacing the value of an existing
* key does not change the number of entries, so it must be handled before
* the capacity check below; otherwise a replacement on a full table would
* needlessly evict an unrelated entry.
*/
entry = hash_get_entry(ht, key, key_len, &id);
if (entry) {
/*
* The key already exists, just perform a value replacement, check if the
* value refers to our own previous allocation.
*/
ret = entry_set_value(entry, val, val_size);
if (ret == -1) {
return -1;
}

return id;
}

/* Check capacity (only required when inserting a new entry) */
if (ht->max_entries > 0 && ht->total_count >= ht->max_entries) {
if (ht->evict_mode == FLB_HASH_TABLE_EVICT_NONE) {
/* Do nothing */
Expand All @@ -427,21 +447,6 @@ int flb_hash_table_add(struct flb_hash_table *ht, const char *key, int key_len,
}
}

/* Check if this is a replacement */
entry = hash_get_entry(ht, key, key_len, &id);
if (entry) {
/*
* The key already exists, just perform a value replacement, check if the
* value refers to our own previous allocation.
*/
ret = entry_set_value(entry, val, val_size);
if (ret == -1) {
return -1;
}

return id;
}

/*
* Below is just code to handle the creation of a new entry in the table
*/
Expand Down
Loading
Loading