simple_rc_async/sync/
broadcastque.rs

1use std::{borrow::BorrowMut, cell::{Cell, RefCell}, collections::VecDeque, ops::Deref, os::unix::thread, pin::Pin, rc::{Rc, Weak}, task::{Context, LocalWaker, Poll}};
2
3use futures_core::Future;
4
5pub(crate) struct Channel<T: Clone> {
6    result: Poll<T>,
7    waker: VecDeque<LocalWaker>,
8}
9
10#[derive(Clone)]
11pub struct Reciever<T: Clone>(Weak<RefCell<Channel<T>>>);
12
13impl<T: Clone + std::fmt::Debug> futures_core::stream::Stream for Reciever<T> {
14    type Item = T;
15
16    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
17        if let Some(this) = self.0.upgrade() {
18            Sender(this).poll_ref(cx).map(|x| Some(x))
19        } else {
20            Poll::Ready(None)
21        }
22    }
23}
24
25#[derive(Clone)]
26pub struct Sender<T: Clone>(Rc<RefCell<Channel<T>>>);
27
28impl<T: Clone + std::fmt::Debug> std::future::Future for Sender<T> {
29    type Output = T;
30
31    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32        self.poll_ref(cx)
33    }
34}
35
36
37impl<T: Clone + std::fmt::Debug> Sender<T> {
38    pub fn poll_ref(&self, cx: &mut Context<'_>) -> Poll<T> {
39        assert!(self.0.as_ptr() as usize >= 0x200);
40        let mut this = self.0.deref().borrow_mut();
41        let result = std::mem::replace(&mut this.result, Poll::Pending);
42        if result.is_pending() { this.waker.push_back(cx.local_waker().clone()); }
43        result
44    }
45    pub fn new() -> Self {
46        Self(Rc::new(RefCell::new(Channel {
47            result: Poll::Pending,
48            waker: VecDeque::new(),
49        })))
50    }
51    pub fn send(&self, v: T, count: usize) {
52        let pointer = self.0.clone();
53        let count = std::cmp::min((*self.0).borrow_mut().waker.len(), count);
54        
55        for _ in 0..count {
56            let r = (*self.0).borrow_mut().waker.pop_front();
57            if let Some(waker) = r {
58                {
59                    assert!(Rc::ptr_eq(&self.0, &pointer));
60                    let a = &*pointer;
61                    let mut b = a.borrow_mut();
62                    b.result = Poll::Ready(v.clone());
63                }
64                waker.wake();
65            } else { break; } 
66        }
67    }
68
69    pub fn reciever(&self) -> Reciever<T> {
70        Reciever(Rc::downgrade(&self.0))
71    }
72}
73
74pub fn channel<T: Clone + std::fmt::Debug>() -> Sender<T> {
75    let a = Sender::new();
76    assert!(a.0.as_ptr() as usize >= 0x200);
77    a
78}
79
80
81#[cfg(test)]
82mod test {
83    use std::task::Poll;
84
85    use futures::StreamExt;
86
87    use crate::task;
88
89    use super::channel;
90
91    #[test]
92    fn test() {
93        let sd = channel::<usize>();
94        eprintln!("Creating handle2");
95        let mut rv2 = sd.reciever();
96        let handle2 = task::spawn(async move {
97            eprintln!("  handle2 0");
98            let a = rv2.next().await.unwrap();
99            eprintln!("  handle2 1");
100            let b = rv2.next().await.unwrap();
101            eprintln!("  handle2 2");
102            let c = rv2.next().await.unwrap();
103            eprintln!("  handle2 3");
104            a + b + c
105        });
106        eprintln!("Creating handle");
107        let mut rv = sd.reciever();
108        let handle = task::spawn(async move {
109            eprintln!("  handle 0");
110            let a = rv.next().await.unwrap();
111            eprintln!("  handle 1");
112            let b = rv.next().await.unwrap();
113            eprintln!("  handle 2");
114            a + b + handle2.await
115        });
116        assert!(!handle.is_ready());
117        eprintln!("Sending 1");
118        let _ = sd.send(1, 10);
119        assert!(!handle.is_ready());
120        eprintln!("Sending 2");
121        let _ = sd.send(2, 20);
122        assert!(!handle.is_ready());
123        eprintln!("Sending 3");
124        let _ = sd.send(3, 30);
125        assert!(handle.is_ready());
126        assert!(handle.poll_rc_nocx() == Poll::Ready(9));
127    }
128
129    // #[test]
130    // fn test2() {
131    //     let cell1 = MaybeReady::pending();
132    //     let cell2 = MaybeReady::pending();
133    //     let handle1 = task::spawn(async { rv.await });
134    //     let handle2 = task::spawn(async { handle1.await });
135    //     assert!(!handle2.is_ready());
136    //     assert!(handle2.poll_rc_nocx() == Poll::Pending);
137    //     let _ = sd.send(1);
138    //     assert!(handle2.is_ready());
139    //     assert!(handle2.poll_rc_nocx() == Poll::Ready(1));a
140    // }
141}
142