diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 92eb7bd..79ac2d1 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -167,15 +167,28 @@ def resolve_logical_time_field(stream_block) nil end + def resolve_native_type + type = @stream_block.type + native_registry = + Pocolog::DataStream + .update_container_types_to_native(type.registry) + native_registry.build(type.name) + end + def extract_logical_time(raw_payload) return unless @logical_time_field + unless @extract_logical_time_sample + @native_type = resolve_native_type + @extract_logical_time_sample = @native_type.new + end + # Skip 21 bytes as they belong to the data stream declaration block # information before the marshalled data. # See rock-core/tools-pocolog/blob/master/spec/spec-v2.txt - @stream_block.type - .from_buffer(raw_payload[21..-1]) - .raw_get(@logical_time_field) + @extract_logical_time_sample + .from_buffer(raw_payload[21..-1]) + .raw_get(@logical_time_field) rescue ArgumentError => e raise unless e.message.match?(/parts.of.the.provided.buffer/) diff --git a/test/datastore/normalize_test.rb b/test/datastore/normalize_test.rb index d0d4348..6fa7fc4 100644 --- a/test/datastore/normalize_test.rb +++ b/test/datastore/normalize_test.rb @@ -267,11 +267,11 @@ class Datastore describe "logical_time" do before do - registry = Typelib::CXXRegistry.new - registry.create_compound "/Time" do |b| + @registry = Typelib::CXXRegistry.new + @registry.create_compound "/Time" do |b| b.microseconds = "uint64_t" end - @test_t = registry.create_compound "/Test" do |b| + @test_t = @registry.create_compound "/Test" do |b| b.time = "/Time" b.other_type = "/int" end @@ -286,24 +286,9 @@ class Datastore other_type: 42 ) - create_logfile "file0.0.log" do - create_logfile_stream( - "stream0", - metadata: { - "rock_task_name" => "task0", - "rock_task_object_name" => "port" - }, - type: @test_t - ) - write_logfile_sample base_time, base_time + 5, value - end - - logfile_pathname("normalized").mkdir - input_path = logfile_pathname("file0.0.log") - normalize.normalize([input_path]) - stream = open_logfile_stream( - ["normalized", "task0::port.0.log"], "task0.port" - ) + logical_time_create_file(value) + logical_time_normalize + stream = logical_time_open_stream assert_equal [[base_time, @timestamp, value]], stream.samples.to_a @@ -311,32 +296,86 @@ class Datastore it "doesn't update the logical time field if rock_timestamp_field "\ "exists" do + @test_t.field_metadata["time"].set("role", "logical_time") value = @test_t.new( time: { microseconds: @timestamp_as_microseconds }, other_type: 42 ) + logical_time_create_file( + value, metadata: { "rock_timestamp_field" => "time" } + ) + logical_time_normalize + stream = logical_time_open_stream + assert_equal [[base_time, base_time + 5, value]], + stream.samples.to_a + end + + it "resizes containers if the native type has a different size" do + # The idea of that test is to create a log file using a + # valid type (compound_t) that has the same marshalled + # representation than a type whose container has a "wrong" + # size + # + # Given that the codepath that will be affected by this is + # the one that extracts logical time from the sample, we have + # to have a Time type. It is placed after the vector so as to + # guarantee that we will corrupt the vector's internal state + # if there are no resizes + # + # Tested to actually crash + container_t = @registry.create_container "/std/vector", "/int32_t" + compound_t = @registry.create_compound "/ResizingTest" do |c| + c.a = container_t + c.b = "/Time" + end + + wrong_registry = Typelib::CXXRegistry.new + wrong_registry.create_compound "/Time" do |b| + b.microseconds = "uint64_t" + end + wrong_container_t = + wrong_registry.create_container("/std/vector", "/int32_t", 1) + wrong_compound_t = + wrong_registry.create_compound "/WrongSizeResizingTest" do |c| + c.a = wrong_container_t + c.b = "/Time" + end + wrong_compound_t.field_metadata["b"].set("role", "logical_time") + + value = compound_t.new(a: [1, 2, 3], b: { microseconds: 10 }) + logical_time_create_file(value, type: wrong_compound_t) + + logfile_pathname("normalized").mkdir + input_path = logfile_pathname("file0.0.log") + # This crashes if the resize did not happen + normalize.normalize([input_path]) + end + + def logical_time_create_file(value, type: value.class, metadata: {}) create_logfile "file0.0.log" do create_logfile_stream( "stream0", metadata: { "rock_task_name" => "task0", - "rock_task_object_name" => "port", - "rock_timestamp_field" => "time" - }, - type: @test_t + "rock_task_object_name" => "port" + }.merge(metadata), + type: type + ) + @__current_stream.write_raw( + base_time, base_time + 5, value.to_byte_array ) - write_logfile_sample base_time, base_time + 5, value end + end + def logical_time_normalize logfile_pathname("normalized").mkdir input_path = logfile_pathname("file0.0.log") normalize.normalize([input_path]) - stream = open_logfile_stream( - ["normalized", "task0::port.0.log"], "task0.port" - ) - assert_equal [[base_time, base_time + 5, value]], - stream.samples.to_a + end + + def logical_time_open_stream + open_logfile_stream(["normalized", "task0::port.0.log"], "task0.port") end end