simple_rc_async/sync/
oneshot.rs1use std::{cell::RefCell, pin::Pin, rc::{Rc, Weak}, task::{Context, LocalWaker, Poll}};
2
3pub(crate) struct Channel<T> {
4 result: Option<Poll<T>>,
5 waker: LocalWaker,
6}
7
8impl<T> Channel<T> {
9 pub(crate) fn new() -> Self {
10 Self{ result: Some(Poll::Pending), waker: LocalWaker::noop().clone() }
11 }
12 pub(crate) fn send(&mut self, v: T) -> LocalWaker {
13 if self.is_completed() { panic!("rc_async::sync::oneshot sent after completed."); }
14 self.result = Some(Poll::Ready(v));
15 self.detach()
16 }
17 pub(crate) fn is_ready(&self) -> bool {
18 self.result.as_ref().map(|x| x.is_ready()).unwrap_or(false)
19 }
20 pub(crate) fn is_completed(&self) -> bool {
21 self.result.is_none()
22 }
23 fn detach(&mut self) -> LocalWaker {
24 std::mem::replace(&mut self.waker, LocalWaker::noop().clone())
25 }
26 pub(crate) fn poll_ref_nocx(&mut self) -> Poll<T> {
30 match self.result {
31 Some(Poll::Ready(_)) => {
32 std::mem::replace(&mut self.result, None).unwrap()
33 }
34 Some(Poll::Pending) => Poll::Pending,
35 None => panic!("rc_async::sync::oneshot polled after completed."),
36 }
37 }
38 pub(crate) fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<T> {
39 let result = self.poll_ref_nocx();
40 if result.is_pending() {
41 self.waker = cx.local_waker().clone();
42 }
43 result
44 }
45}
46
47pub struct Reciever<T>(Rc<RefCell<Channel<T>>>);
48
49impl<T> std::future::Future for Reciever<T> {
50 type Output=T;
51
52 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
53 (*self.0).borrow_mut().poll_ref(cx)
54 }
55}
56
57impl<T> Reciever<T> {
58 pub fn new() -> Self {
59 Self(Rc::new(RefCell::new(Channel::new())))
60 }
61 pub fn sender(&self) -> Sender<T> {
62 Sender(Rc::downgrade(&self.0))
63 }
64 pub fn is_completed(&self) -> bool {
65 self.0.borrow().is_completed()
66 }
67}
68
69#[derive(Clone)]
70pub struct Sender<T>(Weak<RefCell<Channel<T>>>);
71
72impl<T> Sender<T> {
73 pub fn send(self, v: T) -> Result<(), ()> {
74 let rc = self.0.upgrade().ok_or(())?;
75 let waker = (*rc).borrow_mut().send(v);
76 waker.wake();
77 Ok(())
78 }
79}
80
81pub fn channel<T>() -> Reciever<T> {
82 Reciever::new()
83}
84
85
86#[cfg(test)]
87mod test {
88 use std::task::Poll;
89
90 use crate::task;
91
92 use super::channel;
93
94 #[test]
95 fn test() {
96 let rv = channel::<usize>();
97 let sd = rv.sender();
98 let handle = task::spawn(async {
99 let a = rv.await;
100 a
101 });
102 assert!(!handle.is_ready());
103 assert!(handle.poll_rc_nocx() == Poll::Pending);
104 let _ = sd.send(1);
105 assert!(handle.is_ready());
106 assert!(handle.poll_rc_nocx() == Poll::Ready(1));
107 }
108
109 #[test]
110 fn test2() {
111 let rv = channel::<usize>();
112 let sd = rv.sender();
113 let handle1 = task::spawn(async { rv.await });
114 let handle2 = task::spawn(async { handle1.await });
115 assert!(!handle2.is_ready());
116 assert!(handle2.poll_rc_nocx() == Poll::Pending);
117 let _ = sd.send(1);
118 assert!(handle2.is_ready());
119 assert!(handle2.poll_rc_nocx() == Poll::Ready(1));
120 }
121}
122