@@ -85,6 +85,7 @@ if_std! {
8585 mod catch_unwind;
8686 mod chunks;
8787 mod collect;
88+ mod for_each_concurrent;
8889 mod select_all;
8990 mod split;
9091 mod futures_unordered;
@@ -96,6 +97,7 @@ if_std! {
9697 pub use self :: collect:: Collect ;
9798 pub use self :: select_all:: { select_all, SelectAll } ;
9899 pub use self :: split:: { SplitStream , SplitSink , ReuniteError } ;
100+ pub use self :: for_each_concurrent:: ForEachConcurrent ;
99101 pub use self :: futures_unordered:: { futures_unordered, FuturesUnordered } ;
100102 pub use self :: futures_ordered:: { futures_ordered, FuturesOrdered } ;
101103}
@@ -584,10 +586,10 @@ pub trait StreamExt: Stream {
584586 /// to successfully, producing a future. That future will then be executed
585587 /// to completion before moving on to the next item.
586588 ///
587- /// The returned value is a `Future` where the `Item` type is `()` and
588- /// errors are otherwise threaded through. Any error on the stream or in the
589- /// closure will cause iteration to be halted immediately and the future
590- /// will resolve to that error.
589+ /// The returned value is a `Future` where the `Item` type is the completed
590+ /// stream, and errors are otherwise threaded through. Any error on the
591+ /// stream or in the provided future will cause iteration to be halted
592+ /// immediately and the future will resolve to that error.
591593 ///
592594 /// To process each item in the stream and produce another stream instead
593595 /// of a single future, use `and_then` instead.
@@ -599,6 +601,30 @@ pub trait StreamExt: Stream {
599601 for_each:: new ( self , f)
600602 }
601603
604+ /// Runs this stream to completion, executing the provided closure for each
605+ /// element on the stream. This is similar to `for_each` but may begin
606+ /// processing an element while previous elements are still being processed.
607+ ///
608+ /// When this stream successfully resolves to an item, the closure will be
609+ /// called to produce a future. That future will then be added to
610+ /// the set of futures to resolve.
611+ ///
612+ /// The returned value is a `Future` where the `Item` type is the completed
613+ /// stream, and errors are otherwise threaded through. Any error on the
614+ /// stream or in the provided future will cause iteration to be halted
615+ /// immediately and the future will resolve to that error.
616+ ///
617+ /// To process each item in the stream and produce another stream instead
618+ /// of a single future, use `and_then` instead.
619+ #[ cfg( feature = "std" ) ]
620+ fn for_each_concurrent < U , F > ( self , f : F ) -> ForEachConcurrent < Self , U , F >
621+ where F : FnMut ( Self :: Item ) -> U ,
622+ U : IntoFuture < Item =( ) , Error = Self :: Error > ,
623+ Self : Sized
624+ {
625+ for_each_concurrent:: new ( self , f)
626+ }
627+
602628 /// Map this stream's error to a different type using the `Into` trait.
603629 ///
604630 /// This function does for streams what `try!` does for `Result`,
0 commit comments