simple_rc_async/sync/
oneshot.rs

1use std::{cell::RefCell, pin::Pin, rc::{Rc, Weak}, task::{Context, LocalWaker, Poll}};
2
3pub(crate) struct Channel<T> {
4    result: Option<Poll<T>>,
5    waker: LocalWaker,
6}
7
8impl<T> Channel<T> {
9    pub(crate) fn new() -> Self {
10        Self{ result: Some(Poll::Pending), waker: LocalWaker::noop().clone() }
11    }
12    pub(crate) fn send(&mut self, v: T) -> LocalWaker {
13        if self.is_completed() { panic!("rc_async::sync::oneshot sent after completed."); }
14        self.result = Some(Poll::Ready(v));
15        self.detach()
16    }
17    pub(crate) fn is_ready(&self) -> bool {
18        self.result.as_ref().map(|x| x.is_ready()).unwrap_or(false)
19    }
20    pub(crate) fn is_completed(&self) -> bool {
21        self.result.is_none()
22    }
23    fn detach(&mut self) -> LocalWaker {
24        std::mem::replace(&mut self.waker, LocalWaker::noop().clone())
25    }
26    // pub(crate) fn close(&mut self) {
27    //     self.result = None;
28    // }
29    pub(crate) fn poll_ref_nocx(&mut self) -> Poll<T> {
30        match self.result {
31            Some(Poll::Ready(_)) => {
32                std::mem::replace(&mut self.result, None).unwrap()
33            }
34            Some(Poll::Pending) => Poll::Pending,
35            None => panic!("rc_async::sync::oneshot polled after completed."),
36        }
37    }
38    pub(crate) fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<T> {
39        let result = self.poll_ref_nocx();
40        if result.is_pending() {
41            self.waker = cx.local_waker().clone();
42        }
43        result
44    }
45}
46
47pub struct Reciever<T>(Rc<RefCell<Channel<T>>>);
48
49impl<T> std::future::Future for Reciever<T> {
50    type Output=T;
51
52    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
53        (*self.0).borrow_mut().poll_ref(cx)
54    }
55}
56
57impl<T> Reciever<T> {
58    pub fn new() -> Self {
59        Self(Rc::new(RefCell::new(Channel::new())))
60    }
61    pub fn sender(&self) -> Sender<T> {
62        Sender(Rc::downgrade(&self.0))
63    }
64    pub fn is_completed(&self) -> bool {
65        self.0.borrow().is_completed()
66    }
67}
68
69#[derive(Clone)]
70pub struct Sender<T>(Weak<RefCell<Channel<T>>>);
71
72impl<T> Sender<T> {
73    pub fn send(self, v: T) -> Result<(), ()> {
74        let rc = self.0.upgrade().ok_or(())?;
75        let waker = (*rc).borrow_mut().send(v);
76        waker.wake();
77        Ok(())
78    }
79}
80
81pub fn channel<T>() -> Reciever<T> {
82    Reciever::new()
83}
84
85
86#[cfg(test)]
87mod test {
88    use std::task::Poll;
89
90    use crate::task;
91
92    use super::channel;
93
94    #[test]
95    fn test() {
96        let rv = channel::<usize>();
97        let sd = rv.sender();
98        let handle = task::spawn(async {
99            let a = rv.await;
100            a
101        });
102        assert!(!handle.is_ready());
103        assert!(handle.poll_rc_nocx() == Poll::Pending);
104        let _ = sd.send(1);
105        assert!(handle.is_ready());
106        assert!(handle.poll_rc_nocx() == Poll::Ready(1));
107    }
108
109    #[test]
110    fn test2() {
111        let rv = channel::<usize>();
112        let sd = rv.sender();
113        let handle1 = task::spawn(async { rv.await });
114        let handle2 = task::spawn(async { handle1.await });
115        assert!(!handle2.is_ready());
116        assert!(handle2.poll_rc_nocx() == Poll::Pending);
117        let _ = sd.send(1);
118        assert!(handle2.is_ready());
119        assert!(handle2.poll_rc_nocx() == Poll::Ready(1));
120    }
121}
122