diff --git a/serio/src/channel.rs b/serio/src/channel.rs index 9d56cbff..53d63bcd 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -12,7 +12,7 @@ use futures_channel::mpsc; use futures_core::Stream as _; use futures_sink::Sink as _; -use crate::{Deserialize, Serialize, Sink, Stream}; +use crate::{Message, Sink, Stream}; type Item = Box; @@ -30,10 +30,7 @@ impl Sink for MemorySink { .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut self.0) .start_send(Box::new(item)) .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) @@ -60,7 +57,7 @@ pub struct MemoryStream(mpsc::Receiver); impl Stream for MemoryStream { type Error = Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { @@ -94,10 +91,7 @@ impl Sink for UnboundedMemorySink { .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut self.0) .start_send(Box::new(item)) .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) @@ -124,7 +118,7 @@ pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver); impl Stream for UnboundedMemoryStream { type Error = Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { @@ -175,10 +169,7 @@ impl Sink for MemoryDuplex { Pin::new(&mut self.sink).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut self.sink).start_send(item) } @@ -194,7 +185,7 @@ impl Sink for MemoryDuplex { impl Stream for MemoryDuplex { type Error = Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { @@ -244,10 +235,7 @@ impl Sink for UnboundedMemoryDuplex { Pin::new(&mut self.sink).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut self.sink).start_send(item) } @@ -263,7 +251,7 @@ impl Sink for UnboundedMemoryDuplex { impl Stream for UnboundedMemoryDuplex { type Error = Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { diff --git a/serio/src/codec.rs b/serio/src/codec.rs index bc07f847..f45231f0 100644 --- a/serio/src/codec.rs +++ b/serio/src/codec.rs @@ -11,7 +11,7 @@ use bytes::{Bytes, BytesMut}; use futures_core::stream::TryStream; use futures_io::{AsyncRead, AsyncWrite}; -use crate::{Deserialize, IoDuplex, Serialize, Sink, Stream}; +use crate::{Deserialize, Message, IoDuplex, Serialize, Sink, Stream}; /// A codec. pub trait Codec { @@ -167,7 +167,7 @@ where { type Error = Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { diff --git a/serio/src/compat.rs b/serio/src/compat.rs index 2ee6d5b2..3718ca58 100644 --- a/serio/src/compat.rs +++ b/serio/src/compat.rs @@ -4,7 +4,7 @@ use std::{ task::{Context, Poll}, }; -use crate::{Deserialize, Stream}; +use crate::Stream; use super::*; @@ -45,7 +45,7 @@ impl FuturesCompat { impl futures_sink::Sink for FuturesCompat where T: Sink, - Item: Serialize, + Item: Message, { type Error = T::Error; @@ -69,7 +69,7 @@ where impl futures_core::Stream for FuturesCompat where T: Stream, - Item: Deserialize, + Item: Message, { type Item = Result; diff --git a/serio/src/lib.rs b/serio/src/lib.rs index 32cb8c01..b4d01840 100644 --- a/serio/src/lib.rs +++ b/serio/src/lib.rs @@ -29,6 +29,11 @@ pub trait Deserialize: serde::de::DeserializeOwned + Send + Sync + 'static {} impl Deserialize for T where T: serde::de::DeserializeOwned + Send + Sync + 'static {} +/// A type that can be both serialized and deserialized. +pub trait Message: Serialize + Deserialize {} + +impl Message for T where T: Serialize + Deserialize {} + /// A duplex. pub trait Duplex: Sink + Stream {} diff --git a/serio/src/sink.rs b/serio/src/sink.rs index c66520d5..0d27cc27 100644 --- a/serio/src/sink.rs +++ b/serio/src/sink.rs @@ -9,7 +9,7 @@ use std::{ #[cfg(feature = "compat")] use crate::FuturesCompat; -use crate::{Serialize, future::assert_future}; +use crate::{Message, future::assert_future}; /// A sink with an error type of `std::io::Error`. pub trait IoSink: Sink {} @@ -58,7 +58,7 @@ pub trait Sink { /// /// In most cases, if the sink encounters an error, the sink will /// permanently be unable to receive items. - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; /// Flush any remaining output from this sink. /// @@ -95,10 +95,7 @@ impl Sink for &mut S { Pin::new(&mut **self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut **self).start_send(item) } @@ -122,7 +119,7 @@ where self.get_mut().as_mut().poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.get_mut().as_mut().start_send(item) } @@ -152,7 +149,7 @@ pub trait SinkExt: Sink { /// Note that, **because of the flushing requirement, it is usually better /// to batch together items to send via `feed` or `send_all`, /// rather than flushing between each item.** - fn send(&mut self, item: Item) -> Send<'_, Self, Item> + fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin, { @@ -165,7 +162,7 @@ pub trait SinkExt: Sink { /// Unlike `send`, the returned future does not flush the sink. /// It is the caller's responsibility to ensure all pending items /// are processed, which can be done via `flush` or `close`. - fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> + fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin, { @@ -183,7 +180,7 @@ pub trait SinkExt: Sink { /// Wraps the sink in a compatibility layer that allows it to be used as a /// futures 0.3 sink. #[cfg(feature = "compat")] - fn compat_sink(self) -> FuturesCompat + fn compat_sink(self) -> FuturesCompat where Self: Sized, { @@ -237,7 +234,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> { } } -impl Future for Send<'_, Si, Item> { +impl Future for Send<'_, Si, Item> { type Output = Result<(), Si::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -284,7 +281,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> { } } -impl Future for Feed<'_, Si, Item> { +impl Future for Feed<'_, Si, Item> { type Output = Result<(), Si::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/serio/src/stream.rs b/serio/src/stream.rs index 206c7f7b..7781093c 100644 --- a/serio/src/stream.rs +++ b/serio/src/stream.rs @@ -12,7 +12,7 @@ use futures_core::FusedFuture; #[cfg(feature = "compat")] use crate::FuturesCompat; -use crate::{Deserialize, future::assert_future}; +use crate::{Message, future::assert_future}; /// A stream with an error type of `std::io::Error`. pub trait IoStream: Stream {} @@ -48,7 +48,7 @@ pub trait Stream { /// /// - `Poll::Ready(None)` means that the stream has terminated, and /// `poll_next` should not be invoked again. - fn poll_next( + fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>>; @@ -89,7 +89,7 @@ pub trait Stream { impl Stream for &mut S { type Error = S::Error; - fn poll_next( + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { @@ -108,7 +108,7 @@ where { type Error = ::Error; - fn poll_next( + fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { @@ -175,7 +175,7 @@ pub trait StreamExt: Stream { /// assert_eq!(stream.next().await, None); /// # }); /// ``` - fn next(&mut self) -> Next<'_, Self, Item> + fn next(&mut self) -> Next<'_, Self, Item> where Self: Unpin, { @@ -185,7 +185,7 @@ pub trait StreamExt: Stream { /// Wraps the stream in a compatibility layer that allows it to be used as a /// futures 0.3 stream. #[cfg(feature = "compat")] - fn compat_stream(self) -> FuturesCompat + fn compat_stream(self) -> FuturesCompat where Self: Sized, { @@ -194,7 +194,7 @@ pub trait StreamExt: Stream { /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] /// stream types. - fn poll_next_unpin( + fn poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll>> @@ -226,13 +226,13 @@ impl<'a, St: ?Sized + Stream + Unpin, Item> Next<'a, St, Item> { } } -impl FusedFuture for Next<'_, St, Item> { +impl FusedFuture for Next<'_, St, Item> { fn is_terminated(&self) -> bool { self.stream.is_terminated() } } -impl Future for Next<'_, St, Item> { +impl Future for Next<'_, St, Item> { type Output = Option>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -245,7 +245,7 @@ impl Future for Next<'_, St, Ite pub trait IoStreamExt: IoStream { /// Creates a future that resolves to the next item in the stream, returning /// an error if the stream is exhausted. - fn expect_next(&mut self) -> ExpectNext<'_, Self, Item> + fn expect_next(&mut self) -> ExpectNext<'_, Self, Item> where Self: Unpin, { @@ -268,7 +268,7 @@ pub struct ExpectNext<'a, St: ?Sized, Item> { impl Unpin for ExpectNext<'_, St, Item> {} -impl Future for ExpectNext<'_, St, Item> { +impl Future for ExpectNext<'_, St, Item> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {