1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use futures::{prelude::*, stream::FusedStream};
use std::{
pin::Pin,
task::{Context, Poll, Waker},
};
#[derive(Debug)]
pub(crate) struct Pausable<S> {
paused: bool,
stream: S,
waker: Option<Waker>,
}
impl<S: Stream + Unpin> Pausable<S> {
pub(crate) fn new(stream: S) -> Self {
Pausable {
paused: false,
stream,
waker: None,
}
}
pub(crate) fn is_paused(&mut self) -> bool {
self.paused
}
pub(crate) fn pause(&mut self) {
self.paused = true
}
pub(crate) fn unpause(&mut self) {
self.paused = false;
if let Some(w) = self.waker.take() {
w.wake()
}
}
pub(crate) fn stream(&mut self) -> &mut S {
&mut self.stream
}
}
impl<S: Stream + Unpin> Stream for Pausable<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.paused {
return self.stream.poll_next_unpin(cx);
}
self.waker = Some(cx.waker().clone());
Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<S: FusedStream + Unpin> FusedStream for Pausable<S> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::Pausable;
use futures::prelude::*;
#[test]
fn pause_unpause() {
let mut stream = Pausable::new(futures::stream::iter(&[1, 2, 3, 4]));
assert_eq!(Some(Some(&1)), stream.next().now_or_never());
assert_eq!(Some(Some(&2)), stream.next().now_or_never());
stream.pause();
assert_eq!(None, stream.next().now_or_never());
stream.unpause();
assert_eq!(Some(Some(&3)), stream.next().now_or_never());
assert_eq!(Some(Some(&4)), stream.next().now_or_never());
assert_eq!(Some(None), stream.next().now_or_never()) }
}