Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions lib/syskit/log/datastore/normalize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,28 @@ def resolve_logical_time_field(stream_block)
nil
end

def resolve_native_type
type = @stream_block.type
native_registry =
Pocolog::DataStream
.update_container_types_to_native(type.registry)
native_registry.build(type.name)
end

def extract_logical_time(raw_payload)
return unless @logical_time_field

unless @extract_logical_time_sample
@native_type = resolve_native_type
@extract_logical_time_sample = @native_type.new
end

# Skip 21 bytes as they belong to the data stream declaration block
# information before the marshalled data.
# See rock-core/tools-pocolog/blob/master/spec/spec-v2.txt
@stream_block.type
.from_buffer(raw_payload[21..-1])
.raw_get(@logical_time_field)
@extract_logical_time_sample
.from_buffer(raw_payload[21..-1])
.raw_get(@logical_time_field)
rescue ArgumentError => e
raise unless e.message.match?(/parts.of.the.provided.buffer/)

Expand Down
101 changes: 70 additions & 31 deletions test/datastore/normalize_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ class Datastore

describe "logical_time" do
before do
registry = Typelib::CXXRegistry.new
registry.create_compound "/Time" do |b|
@registry = Typelib::CXXRegistry.new
@registry.create_compound "/Time" do |b|
b.microseconds = "uint64_t"
end
@test_t = registry.create_compound "/Test" do |b|
@test_t = @registry.create_compound "/Test" do |b|
b.time = "/Time"
b.other_type = "/int"
end
Expand All @@ -286,57 +286,96 @@ class Datastore
other_type: 42
)

create_logfile "file0.0.log" do
create_logfile_stream(
"stream0",
metadata: {
"rock_task_name" => "task0",
"rock_task_object_name" => "port"
},
type: @test_t
)
write_logfile_sample base_time, base_time + 5, value
end

logfile_pathname("normalized").mkdir
input_path = logfile_pathname("file0.0.log")
normalize.normalize([input_path])
stream = open_logfile_stream(
["normalized", "task0::port.0.log"], "task0.port"
)
logical_time_create_file(value)
logical_time_normalize
stream = logical_time_open_stream

assert_equal [[base_time, @timestamp, value]],
stream.samples.to_a
end

it "doesn't update the logical time field if rock_timestamp_field "\
"exists" do
@test_t.field_metadata["time"].set("role", "logical_time")
value = @test_t.new(
time: { microseconds: @timestamp_as_microseconds },
other_type: 42
)
logical_time_create_file(
value, metadata: { "rock_timestamp_field" => "time" }
)

logical_time_normalize
stream = logical_time_open_stream
assert_equal [[base_time, base_time + 5, value]],
stream.samples.to_a
end

it "resizes containers if the native type has a different size" do
# The idea of that test is to create a log file using a
# valid type (compound_t) that has the same marshalled
# representation than a type whose container has a "wrong"
# size
#
# Given that the codepath that will be affected by this is
# the one that extracts logical time from the sample, we have
# to have a Time type. It is placed after the vector so as to
# guarantee that we will corrupt the vector's internal state
# if there are no resizes
#
# Tested to actually crash
container_t = @registry.create_container "/std/vector", "/int32_t"
compound_t = @registry.create_compound "/ResizingTest" do |c|
c.a = container_t
c.b = "/Time"
end

wrong_registry = Typelib::CXXRegistry.new
wrong_registry.create_compound "/Time" do |b|
b.microseconds = "uint64_t"
end
wrong_container_t =
wrong_registry.create_container("/std/vector", "/int32_t", 1)
wrong_compound_t =
wrong_registry.create_compound "/WrongSizeResizingTest" do |c|
c.a = wrong_container_t
c.b = "/Time"
end
wrong_compound_t.field_metadata["b"].set("role", "logical_time")

value = compound_t.new(a: [1, 2, 3], b: { microseconds: 10 })
logical_time_create_file(value, type: wrong_compound_t)

logfile_pathname("normalized").mkdir
input_path = logfile_pathname("file0.0.log")
# This crashes if the resize did not happen
normalize.normalize([input_path])
end

def logical_time_create_file(value, type: value.class, metadata: {})
create_logfile "file0.0.log" do
create_logfile_stream(
"stream0",
metadata: {
"rock_task_name" => "task0",
"rock_task_object_name" => "port",
"rock_timestamp_field" => "time"
},
type: @test_t
"rock_task_object_name" => "port"
}.merge(metadata),
type: type
)
@__current_stream.write_raw(
base_time, base_time + 5, value.to_byte_array
)
write_logfile_sample base_time, base_time + 5, value
end
end

def logical_time_normalize
logfile_pathname("normalized").mkdir
input_path = logfile_pathname("file0.0.log")
normalize.normalize([input_path])
stream = open_logfile_stream(
["normalized", "task0::port.0.log"], "task0.port"
)
assert_equal [[base_time, base_time + 5, value]],
stream.samples.to_a
end

def logical_time_open_stream
open_logfile_stream(["normalized", "task0::port.0.log"], "task0.port")
end
end

Expand Down