Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 9 additions & 21 deletions serio/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_channel::mpsc;
use futures_core::Stream as _;
use futures_sink::Sink as _;

use crate::{Deserialize, Serialize, Sink, Stream};
use crate::{Message, Sink, Stream};

type Item = Box<dyn Any + Send + Sync + 'static>;

Expand All @@ -30,10 +30,7 @@ impl Sink for MemorySink {
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
fn start_send<Item: Message>(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
Pin::new(&mut self.0)
.start_send(Box::new(item))
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
Expand All @@ -60,7 +57,7 @@ pub struct MemoryStream(mpsc::Receiver<Item>);
impl Stream for MemoryStream {
type Error = Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand Down Expand Up @@ -94,10 +91,7 @@ impl Sink for UnboundedMemorySink {
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
fn start_send<Item: Message>(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
Pin::new(&mut self.0)
.start_send(Box::new(item))
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
Expand All @@ -124,7 +118,7 @@ pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver<Item>);
impl Stream for UnboundedMemoryStream {
type Error = Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand Down Expand Up @@ -175,10 +169,7 @@ impl Sink for MemoryDuplex {
Pin::new(&mut self.sink).poll_ready(cx)
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
fn start_send<Item: Message>(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}

Expand All @@ -194,7 +185,7 @@ impl Sink for MemoryDuplex {
impl Stream for MemoryDuplex {
type Error = Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand Down Expand Up @@ -244,10 +235,7 @@ impl Sink for UnboundedMemoryDuplex {
Pin::new(&mut self.sink).poll_ready(cx)
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
fn start_send<Item: Message>(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}

Expand All @@ -263,7 +251,7 @@ impl Sink for UnboundedMemoryDuplex {
impl Stream for UnboundedMemoryDuplex {
type Error = Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand Down
4 changes: 2 additions & 2 deletions serio/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bytes::{Bytes, BytesMut};
use futures_core::stream::TryStream;
use futures_io::{AsyncRead, AsyncWrite};

use crate::{Deserialize, IoDuplex, Serialize, Sink, Stream};
use crate::{Deserialize, Message, IoDuplex, Serialize, Sink, Stream};

/// A codec.
pub trait Codec<Io> {
Expand Down Expand Up @@ -167,7 +167,7 @@ where
{
type Error = Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Error>>> {
Expand Down
6 changes: 3 additions & 3 deletions serio/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
task::{Context, Poll},
};

use crate::{Deserialize, Stream};
use crate::Stream;

use super::*;

Expand Down Expand Up @@ -45,7 +45,7 @@ impl<T, Item> FuturesCompat<T, Item> {
impl<T, Item> futures_sink::Sink<Item> for FuturesCompat<T, Item>
where
T: Sink,
Item: Serialize,
Item: Message,
{
type Error = T::Error;

Expand All @@ -69,7 +69,7 @@ where
impl<T, Item> futures_core::Stream for FuturesCompat<T, Item>
where
T: Stream,
Item: Deserialize,
Item: Message,
{
type Item = Result<Item, T::Error>;

Expand Down
5 changes: 5 additions & 0 deletions serio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub trait Deserialize: serde::de::DeserializeOwned + Send + Sync + 'static {}

impl<T> Deserialize for T where T: serde::de::DeserializeOwned + Send + Sync + 'static {}

/// A type that can be both serialized and deserialized.
pub trait Message: Serialize + Deserialize {}

impl<T> Message for T where T: Serialize + Deserialize {}

/// A duplex.
pub trait Duplex: Sink + Stream {}

Expand Down
21 changes: 9 additions & 12 deletions serio/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

#[cfg(feature = "compat")]
use crate::FuturesCompat;
use crate::{Serialize, future::assert_future};
use crate::{Message, future::assert_future};

/// A sink with an error type of `std::io::Error`.
pub trait IoSink: Sink<Error = std::io::Error> {}
Expand Down Expand Up @@ -58,7 +58,7 @@ pub trait Sink {
///
/// In most cases, if the sink encounters an error, the sink will
/// permanently be unable to receive items.
fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn start_send<Item: Message>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;

/// Flush any remaining output from this sink.
///
Expand Down Expand Up @@ -95,10 +95,7 @@ impl<S: ?Sized + Sink + Unpin> Sink for &mut S {
Pin::new(&mut **self).poll_ready(cx)
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
fn start_send<Item: Message>(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
Pin::new(&mut **self).start_send(item)
}

Expand All @@ -122,7 +119,7 @@ where
self.get_mut().as_mut().poll_ready(cx)
}

fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
fn start_send<Item: Message>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
self.get_mut().as_mut().start_send(item)
}

Expand Down Expand Up @@ -152,7 +149,7 @@ pub trait SinkExt: Sink {
/// Note that, **because of the flushing requirement, it is usually better
/// to batch together items to send via `feed` or `send_all`,
/// rather than flushing between each item.**
fn send<Item: Serialize>(&mut self, item: Item) -> Send<'_, Self, Item>
fn send<Item: Message>(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
Expand All @@ -165,7 +162,7 @@ pub trait SinkExt: Sink {
/// Unlike `send`, the returned future does not flush the sink.
/// It is the caller's responsibility to ensure all pending items
/// are processed, which can be done via `flush` or `close`.
fn feed<Item: Serialize>(&mut self, item: Item) -> Feed<'_, Self, Item>
fn feed<Item: Message>(&mut self, item: Item) -> Feed<'_, Self, Item>
where
Self: Unpin,
{
Expand All @@ -183,7 +180,7 @@ pub trait SinkExt: Sink {
/// Wraps the sink in a compatibility layer that allows it to be used as a
/// futures 0.3 sink.
#[cfg(feature = "compat")]
fn compat_sink<Item: Serialize>(self) -> FuturesCompat<Self, Item>
fn compat_sink<Item: Message>(self) -> FuturesCompat<Self, Item>
where
Self: Sized,
{
Expand Down Expand Up @@ -237,7 +234,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> {
}
}

impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Send<'_, Si, Item> {
impl<Si: Sink + Unpin + ?Sized, Item: Message> Future for Send<'_, Si, Item> {
type Output = Result<(), Si::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -284,7 +281,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
}
}

impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Feed<'_, Si, Item> {
impl<Si: Sink + Unpin + ?Sized, Item: Message> Future for Feed<'_, Si, Item> {
type Output = Result<(), Si::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
22 changes: 11 additions & 11 deletions serio/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_core::FusedFuture;

#[cfg(feature = "compat")]
use crate::FuturesCompat;
use crate::{Deserialize, future::assert_future};
use crate::{Message, future::assert_future};

/// A stream with an error type of `std::io::Error`.
pub trait IoStream: Stream<Error = std::io::Error> {}
Expand Down Expand Up @@ -48,7 +48,7 @@ pub trait Stream {
///
/// - `Poll::Ready(None)` means that the stream has terminated, and
/// `poll_next` should not be invoked again.
fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub trait Stream {
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Error = S::Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand All @@ -108,7 +108,7 @@ where
{
type Error = <P::Target as Stream>::Error;

fn poll_next<Item: Deserialize>(
fn poll_next<Item: Message>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Expand Down Expand Up @@ -175,7 +175,7 @@ pub trait StreamExt: Stream {
/// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn next<Item: Deserialize>(&mut self) -> Next<'_, Self, Item>
fn next<Item: Message>(&mut self) -> Next<'_, Self, Item>
where
Self: Unpin,
{
Expand All @@ -185,7 +185,7 @@ pub trait StreamExt: Stream {
/// Wraps the stream in a compatibility layer that allows it to be used as a
/// futures 0.3 stream.
#[cfg(feature = "compat")]
fn compat_stream<Item: Deserialize>(self) -> FuturesCompat<Self, Item>
fn compat_stream<Item: Message>(self) -> FuturesCompat<Self, Item>
where
Self: Sized,
{
Expand All @@ -194,7 +194,7 @@ pub trait StreamExt: Stream {

/// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
/// stream types.
fn poll_next_unpin<Item: Deserialize>(
fn poll_next_unpin<Item: Message>(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>
Expand Down Expand Up @@ -226,13 +226,13 @@ impl<'a, St: ?Sized + Stream + Unpin, Item> Next<'a, St, Item> {
}
}

impl<St: ?Sized + FusedStream + Unpin, Item: Deserialize> FusedFuture for Next<'_, St, Item> {
impl<St: ?Sized + FusedStream + Unpin, Item: Message> FusedFuture for Next<'_, St, Item> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: ?Sized + Stream + Unpin, Item: Deserialize> Future for Next<'_, St, Item> {
impl<St: ?Sized + Stream + Unpin, Item: Message> Future for Next<'_, St, Item> {
type Output = Option<Result<Item, St::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -245,7 +245,7 @@ impl<St: ?Sized + Stream + Unpin, Item: Deserialize> Future for Next<'_, St, Ite
pub trait IoStreamExt: IoStream {
/// Creates a future that resolves to the next item in the stream, returning
/// an error if the stream is exhausted.
fn expect_next<Item: Deserialize>(&mut self) -> ExpectNext<'_, Self, Item>
fn expect_next<Item: Message>(&mut self) -> ExpectNext<'_, Self, Item>
where
Self: Unpin,
{
Expand All @@ -268,7 +268,7 @@ pub struct ExpectNext<'a, St: ?Sized, Item> {

impl<St: ?Sized + Unpin, Item> Unpin for ExpectNext<'_, St, Item> {}

impl<St: ?Sized + IoStream + Unpin, Item: Deserialize> Future for ExpectNext<'_, St, Item> {
impl<St: ?Sized + IoStream + Unpin, Item: Message> Future for ExpectNext<'_, St, Item> {
type Output = Result<Item, St::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
Loading