Skip to content

Commit 3488f02

Browse files
ml: Add unit tests
Signed-off-by: Mirko Lazarevic <[email protected]>
1 parent 23a2e18 commit 3488f02

File tree

1 file changed

+270
-0
lines changed

1 file changed

+270
-0
lines changed

tests/internal/multiline.c

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,6 +1643,275 @@ static void test_buffer_limit_disabled()
16431643
flb_config_exit(config);
16441644
}
16451645

1646+
/*
1647+
* Helper struct to track flush results with metadata
1648+
*/
1649+
struct metadata_result {
1650+
int current_record;
1651+
int total_expected;
1652+
char *key;
1653+
int records_with_full_metadata; /* Count of records with full metadata */
1654+
int records_with_log_only; /* Count of records with only 'log' field */
1655+
};
1656+
1657+
/*
1658+
* Callback that verifies metadata preservation
1659+
*
1660+
* Before the fix: continuation lines would have only 1 field (log)
1661+
* After the fix: all lines should have multiple fields (time, stream, log, file, etc.)
1662+
*/
1663+
static int flush_callback_metadata_check(struct flb_ml_parser *parser,
1664+
struct flb_ml_stream *mst,
1665+
void *data, char *buf_data, size_t buf_size)
1666+
{
1667+
int ret;
1668+
int field_count;
1669+
size_t off = 0;
1670+
msgpack_unpacked result;
1671+
msgpack_object *map;
1672+
msgpack_object key;
1673+
struct flb_time tm;
1674+
struct metadata_result *res = data;
1675+
1676+
/* Unpack the record */
1677+
msgpack_unpacked_init(&result);
1678+
ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
1679+
if (ret != MSGPACK_UNPACK_SUCCESS) {
1680+
msgpack_unpacked_destroy(&result);
1681+
return -1;
1682+
}
1683+
1684+
/* Extract timestamp and map */
1685+
flb_time_pop_from_msgpack(&tm, &result, &map);
1686+
1687+
/* Count fields in the map */
1688+
field_count = map->via.map.size;
1689+
fprintf(stdout, "[Record %d] Fields: %d, Timestamp: %lu.%lu\n",
1690+
res->current_record, field_count,
1691+
(unsigned long)tm.tm.tv_sec, (unsigned long)tm.tm.tv_nsec);
1692+
1693+
1694+
/* Print field names */
1695+
fprintf(stdout, " Field names: ");
1696+
for (int i = 0; i < field_count; i++) {
1697+
key = map->via.map.ptr[i].key;
1698+
if (key.type == MSGPACK_OBJECT_STR) {
1699+
fprintf(stdout, "%.*s", (int)key.via.str.size, key.via.str.ptr);
1700+
if (i < field_count - 1) {
1701+
fprintf(stdout, ", ");
1702+
}
1703+
}
1704+
}
1705+
fprintf(stdout, "\n");
1706+
1707+
/* Track metadata presence */
1708+
/* 4 fields: time, stream, log, file*/
1709+
if (field_count == 4) {
1710+
res->records_with_full_metadata++;
1711+
} else {
1712+
res->records_with_log_only++;
1713+
fprintf(stdout, " WARNING: Record has only 'log' field (missing metadata)\n");
1714+
}
1715+
1716+
res->current_record++;
1717+
msgpack_unpacked_destroy(&result);
1718+
1719+
return 0;
1720+
}
1721+
1722+
static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_id,
1723+
struct flb_time *tm, const char *log_content,
1724+
const char *stream_name, const char *file_path)
1725+
{
1726+
int ret;
1727+
int log_len;
1728+
size_t off = 0;
1729+
msgpack_sbuffer mp_sbuf;
1730+
msgpack_packer mp_pck;
1731+
msgpack_unpacked result;
1732+
msgpack_object root;
1733+
msgpack_object *map;
1734+
1735+
log_len = strlen(log_content);
1736+
1737+
/* initialize buffers */
1738+
msgpack_sbuffer_init(&mp_sbuf);
1739+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1740+
1741+
/* Array: [timestamp, map] */
1742+
msgpack_pack_array(&mp_pck, 2);
1743+
flb_time_append_to_msgpack(tm, &mp_pck, 0);
1744+
1745+
/* Map with 4 fields: time (string), stream, log, file */
1746+
msgpack_pack_map(&mp_pck, 4);
1747+
1748+
/* time field */
1749+
msgpack_pack_str(&mp_pck, 4);
1750+
msgpack_pack_str_body(&mp_pck, "time", 4);
1751+
msgpack_pack_str(&mp_pck, 24);
1752+
msgpack_pack_str_body(&mp_pck, "2025-12-01T17:33:44+00:00", 24);
1753+
1754+
/* stream field */
1755+
msgpack_pack_str(&mp_pck, 6);
1756+
msgpack_pack_str_body(&mp_pck, "stream", 6);
1757+
msgpack_pack_str(&mp_pck, strlen(stream_name));
1758+
msgpack_pack_str_body(&mp_pck, stream_name, strlen(stream_name));
1759+
1760+
/* log field */
1761+
msgpack_pack_str(&mp_pck, 3);
1762+
msgpack_pack_str_body(&mp_pck, "log", 3);
1763+
msgpack_pack_str(&mp_pck, log_len);
1764+
msgpack_pack_str_body(&mp_pck, log_content, log_len);
1765+
1766+
/* file field */
1767+
msgpack_pack_str(&mp_pck, 4);
1768+
msgpack_pack_str_body(&mp_pck, "file", 4);
1769+
msgpack_pack_str(&mp_pck, strlen(file_path));
1770+
msgpack_pack_str_body(&mp_pck, file_path, strlen(file_path));
1771+
1772+
/* Unpack and lookup the content map */
1773+
msgpack_unpacked_init(&result);
1774+
ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
1775+
if (ret != MSGPACK_UNPACK_SUCCESS) {
1776+
msgpack_unpacked_destroy(&result);
1777+
msgpack_sbuffer_destroy(&mp_sbuf);
1778+
return -1;
1779+
}
1780+
1781+
root = result.data;
1782+
map = &root.via.array.ptr[1];
1783+
1784+
/* Send to multiline processor */
1785+
ret = flb_ml_append_object(ml, stream_id, tm, NULL, map);
1786+
1787+
msgpack_unpacked_destroy(&result);
1788+
msgpack_sbuffer_destroy(&mp_sbuf);
1789+
1790+
return ret;
1791+
}
1792+
1793+
1794+
/*
1795+
* Test issue 10576: Metadata preservation when lines are flushed
1796+
* ---------------------
1797+
* - https://github.com/fluent/fluent-bit/issues/10576
1798+
*
1799+
* Input pattern just as an example:
1800+
* - Line 1: "non-iso timestamp Likely to fail" (matches continuation)
1801+
* - Line 2: "Likely to fail" (matches continuation)
1802+
* - Line 3: "[iso timestamp] should be ok" (matches start_state)
1803+
* - Line 4: "non-iso timestamp Likely to fail" (matches continuation)
1804+
* - Line 5: "non-iso timestamp Likely to fail" (matches continuation)
1805+
*
1806+
* Before fix: Lines 1, 2, 4, 5 would have only {"log": "..."} (missing metadata)
1807+
* After fix: All lines should have {"time": "...", "stream": "...", "log": "...", "file": "..."}
1808+
*/
1809+
static void test_issue_10567_metadata_preservation()
1810+
{
1811+
int ret;
1812+
int i;
1813+
uint64_t stream_id;
1814+
struct flb_config *config;
1815+
struct flb_time tm;
1816+
struct flb_ml *ml;
1817+
struct flb_ml_parser *mlp;
1818+
struct flb_ml_parser_ins *mlp_i;
1819+
struct metadata_result res = {0};
1820+
1821+
/* Test input - mix of start_state and continuation lines */
1822+
const char *test_lines[] = {
1823+
"Mon Dec 1 17:33:44 UTC 2025 Likely to fail", /* continuation (no [timestamp]) */
1824+
"Mon Dec 1 17:33:49 UTC 2025 Likely to fail", /* continuation */
1825+
"[2025-12-01T17:33:54.551Z] should be ok", /* start_state */
1826+
"Mon Dec 1 17:33:59 UTC 2025 Likely to fail", /* continuation */
1827+
"Mon Dec 1 17:34:04 UTC 2025 Likely to fail", /* continuation */
1828+
"[2025-12-01T17:34:09.555Z] should be ok", /* start_state */
1829+
};
1830+
1831+
int num_lines = sizeof(test_lines) / sizeof(test_lines[0]);
1832+
1833+
/* Initialize */
1834+
config = flb_config_init();
1835+
TEST_CHECK(config != NULL);
1836+
1837+
/* Create custom multiline parser */
1838+
mlp = flb_ml_parser_create(config,
1839+
"parser_10576", /* name */
1840+
FLB_ML_REGEX, /* type */
1841+
NULL, /* match_str */
1842+
FLB_FALSE, /* negate */
1843+
1000, /* flush_ms */
1844+
"log", /* key_content */
1845+
NULL, /* key_group */
1846+
NULL, /* key_pattern */
1847+
NULL, /* parser */
1848+
NULL); /* parser_name */
1849+
TEST_CHECK(mlp != NULL);
1850+
1851+
/* start_state - matches [YYYY-MM-DDTHH:MM:SS.sssZ] format */
1852+
ret = flb_ml_rule_create(mlp, "start_state",
1853+
"/^\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\]/",
1854+
"cont", NULL);
1855+
TEST_CHECK(ret == 0);
1856+
1857+
/* cont - matches lines NOT starting with [timestamp] */
1858+
ret = flb_ml_rule_create(mlp, "cont",
1859+
"/^(?!\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\])/",
1860+
"cont", NULL);
1861+
TEST_CHECK(ret == 0);
1862+
1863+
ret = flb_ml_parser_init(mlp);
1864+
TEST_CHECK(ret == 0);
1865+
1866+
/* Create ML context */
1867+
ml = flb_ml_create(config, "test-metadata");
1868+
TEST_CHECK(ml != NULL);
1869+
1870+
mlp_i = flb_ml_parser_instance_create(ml, "parser_10576");
1871+
TEST_CHECK(mlp_i != NULL);
1872+
1873+
flb_ml_parser_instance_set(mlp_i, "key_content", "log");
1874+
1875+
/* Initialize result tracking */
1876+
res.key = "log";
1877+
res.total_expected = num_lines;
1878+
1879+
/* Create stream */
1880+
ret = flb_ml_stream_create(ml, "test-stream", -1,
1881+
flush_callback_metadata_check,
1882+
(void *)&res, &stream_id);
1883+
TEST_CHECK(ret == 0);
1884+
1885+
/* Send each line and flush immediately after */
1886+
for (i = 0; i < num_lines; i++) {
1887+
flb_time_get(&tm);
1888+
1889+
fprintf(stdout, "Input[%d]: %s\n", i, test_lines[i]);
1890+
1891+
ret = append_log_to_multiline_processor(ml, stream_id, &tm, test_lines[i],
1892+
"stdout", "/var/log/test.log");
1893+
TEST_CHECK(ret == 0);
1894+
1895+
/* Note: Flush after each line to simulate slow log arrival */
1896+
flb_ml_flush_pending_now(ml);
1897+
}
1898+
1899+
/* Final flush to ensure nothing is left */
1900+
flb_ml_flush_pending_now(ml);
1901+
1902+
/*
1903+
* After the fix, ALL records should have full metadata.
1904+
*/
1905+
TEST_CHECK(res.records_with_log_only == 0);
1906+
TEST_CHECK(res.records_with_full_metadata == res.current_record);
1907+
TEST_CHECK(res.current_record == num_lines);
1908+
1909+
/* Cleanup */
1910+
flb_ml_destroy(ml);
1911+
flb_config_exit(config);
1912+
}
1913+
1914+
16461915
TEST_LIST = {
16471916
/* Normal features tests */
16481917
{ "parser_docker", test_parser_docker},
@@ -1663,5 +1932,6 @@ TEST_LIST = {
16631932
{ "issue_4034" , test_issue_4034},
16641933
{ "issue_4949" , test_issue_4949},
16651934
{ "issue_5504" , test_issue_5504},
1935+
{ "issue_10576" , test_issue_10567_metadata_preservation },
16661936
{ 0 }
16671937
};

0 commit comments

Comments
 (0)