Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ jobs:
- name: Build (Android)
if: ${{ contains(matrix.target, 'android') }}
run: |
sudo apt install -y gcc-multilib

ln -sf $ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/linux-x86_64/lib/${{ matrix.ndk_arch }}/{libunwind.so,libc++abi.a} $ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/linux-x86_64/lib/
cargo install cargo-ndk
cargo ndk --target ${{ matrix.target }} build --release -p livekit --workspace -vv
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion livekit-ffi/generate_proto_win.bat
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ protoc.exe ^
%PROTOCOL%/audio_frame.proto ^
%PROTOCOL%/e2ee.proto ^
%PROTOCOL%/stats.proto ^
%PROTOCOL%/rpc.proto
%PROTOCOL%/rpc.proto ^
%PROTOCOL%/data_stream.proto
19 changes: 17 additions & 2 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ message FfiRequest {
TextStreamWriterWriteRequest text_stream_write = 65;
TextStreamWriterCloseRequest text_stream_close = 66;

// NEXT_ID: 67
PublishMetricsRequest publish_metrics = 67;

// NEXT_ID: 68
}
}

Expand Down Expand Up @@ -243,7 +245,9 @@ message FfiResponse {
TextStreamWriterWriteResponse text_stream_write = 64;
TextStreamWriterCloseResponse text_stream_close = 65;

// NEXT_ID: 66
PublishMetricsResponse publish_metrics = 66;

// NEXT_ID: 67
}
}

Expand Down Expand Up @@ -301,6 +305,17 @@ message FfiEvent {
}
}

message PublishMetricsRequest {
required uint64 local_participant_handle = 1;
required uint64 data_ptr = 2;
required uint64 data_len = 3;
optional uint64 async_id = 4;
}

message PublishMetricsResponse {
required uint64 async_id = 1;
}

// Stop all rooms synchronously (Do we need async here?).
// e.g: This is used for the Unity Editor after each assemblies reload.
// TODO(theomonnom): Implement a debug mode where we can find all leaked handles?
Expand Down
26 changes: 24 additions & 2 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4804,7 +4804,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -4952,13 +4952,15 @@ pub mod ffi_request {
TextStreamWrite(super::TextStreamWriterWriteRequest),
#[prost(message, tag="66")]
TextStreamClose(super::TextStreamWriterCloseRequest),
#[prost(message, tag="67")]
PublishMetrics(super::PublishMetricsRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -5104,6 +5106,8 @@ pub mod ffi_response {
TextStreamWrite(super::TextStreamWriterWriteResponse),
#[prost(message, tag="65")]
TextStreamClose(super::TextStreamWriterCloseResponse),
#[prost(message, tag="66")]
PublishMetrics(super::PublishMetricsResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down Expand Up @@ -5202,6 +5206,24 @@ pub mod ffi_event {
SendText(super::StreamSendTextCallback),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishMetricsRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(uint64, required, tag="2")]
pub data_ptr: u64,
#[prost(uint64, required, tag="3")]
pub data_len: u64,
#[prost(uint64, optional, tag="4")]
pub async_id: ::core::option::Option<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishMetricsResponse {
#[prost(uint64, required, tag="1")]
pub async_id: u64,
}
/// Stop all rooms synchronously (Do we need async here?).
/// e.g: This is used for the Unity Editor after each assemblies reload.
/// TODO(theomonnom): Implement a debug mode where we can find all leaked handles?
Expand Down
16 changes: 16 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,16 @@ fn on_text_stream_close(
writer.close(server, request)
}

fn on_publish_metrics(
server: &'static FfiServer,
publish: proto::PublishMetricsRequest,
) -> FfiResult<proto::PublishMetricsResponse> {
let ffi_participant =
server.retrieve_handle::<FfiParticipant>(publish.local_participant_handle)?;

ffi_participant.room.publish_metrics(server, publish)
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -1414,6 +1424,12 @@ pub fn handle_request(
on_set_track_subscription_permissions(server, request)?,
)
}
proto::ffi_request::Message::PublishMetrics(publish_metrics) => {
proto::ffi_response::Message::PublishMetrics(on_publish_metrics(
server,
publish_metrics,
)?)
}
});

Ok(res)
Expand Down
29 changes: 29 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,35 @@ impl RoomInner {
pub fn url(&self) -> String {
self.url.clone()
}

pub fn publish_metrics(
self: &Arc<Self>,
server: &'static FfiServer,
publish: proto::PublishMetricsRequest,
) -> FfiResult<proto::PublishMetricsResponse> {
let async_id = publish.async_id.unwrap_or_else(|| server.next_id());

let data = unsafe {
slice::from_raw_parts(publish.data_ptr as *const u8, publish.data_len as usize)
}
.to_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it copy the data here ?


let self_clone = self.clone();
let handle = server.async_runtime.spawn(async move {
if let Err(err) = self_clone.data_tx.send(FfiDataPacket {
payload: DataPacket {
payload: data.to_vec(), // Avoid copy?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming this PR is still active, can we address the comment to avoid the copy here ?

..Default::default()
},
async_id,
}) {
log::error!("Failed to publish metrics: {:?}", err);
}
});
server.watch_panic(handle);

Ok(proto::PublishMetricsResponse { async_id })
}
}

// Task used to publish data without blocking the client thread
Expand Down
4 changes: 2 additions & 2 deletions livekit/src/room/data_stream/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> StreamWriter<'a> for ByteStreamWriter {
&self.info
}

async fn write(&self, bytes: &[u8]) -> StreamResult<()> {
async fn write(&self, bytes: &'a [u8]) -> StreamResult<()> {
let mut stream = self.stream.lock().await;
for chunk in bytes.chunks(CHUNK_SIZE) {
stream.write_chunk(chunk).await?;
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<'a> StreamWriter<'a> for TextStreamWriter {
&self.info
}

async fn write(&self, text: &str) -> StreamResult<()> {
async fn write(&self, text: &'a str) -> StreamResult<()> {
let mut stream = self.stream.lock().await;
for chunk in text.as_bytes().utf8_aware_chunks(CHUNK_SIZE) {
stream.write_chunk(chunk).await?;
Expand Down
Loading
Loading