simple_rc_async/sync/
broadcast.rs1use std::{borrow::BorrowMut, cell::{Cell, RefCell}, hash::Hash, ops::Deref, os::unix::thread, pin::Pin, rc::{Rc, Weak}, task::{Context, LocalWaker, Poll}};
2
3use futures_core::Future;
4
5pub(crate) struct Channel<T: Clone> {
6 result: Poll<T>,
7 waker: Vec<LocalWaker>,
8}
9
10#[derive(Clone)]
11pub struct Reciever<T: Clone>(Weak<RefCell<Channel<T>>>);
12
13impl<T: Clone + std::fmt::Debug> futures_core::stream::Stream for Reciever<T> {
14 type Item = T;
15
16 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
17 if let Some(this) = self.0.upgrade() {
18 Sender(this).poll_ref(cx).map(|x| Some(x))
19 } else {
20 Poll::Ready(None)
21 }
22 }
23}
24
25#[derive(Clone)]
26pub struct Sender<T: Clone>(Rc<RefCell<Channel<T>>>);
27
28impl<T: Clone + std::fmt::Debug> std::future::Future for Sender<T> {
29 type Output = T;
30
31 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32 self.poll_ref(cx)
33 }
34}
35
36
37impl<T: Clone + std::fmt::Debug> Sender<T> {
38 pub fn poll_ref(&self, cx: &mut Context<'_>) -> Poll<T> {
39 assert!(self.0.as_ptr() as usize >= 0x200);
40 let mut this = self.0.deref().borrow_mut();
41 let result = std::mem::replace(&mut this.result, Poll::Pending);
42 if result.is_pending() { this.waker.push(cx.local_waker().clone()); }
43 result
44 }
45 pub fn new() -> Self {
46 Self(Rc::new(RefCell::new(Channel {
47 result: Poll::Pending,
48 waker: Vec::new(),
49 })))
50 }
51 fn retrieve_wakers(&self) -> Vec<LocalWaker> {
52 let mut lock = (*self.0).borrow_mut();
53 std::mem::replace(&mut lock.waker, Vec::new())
54 }
55 pub fn send(&self, v: T) {
56 let wakers = self.retrieve_wakers();
57 let pointer = self.0.clone();
58 for waker in wakers {
59 {
60 assert!(Rc::ptr_eq(&self.0, &pointer));
61 let a = &*pointer;
62 let mut b = a.borrow_mut();
63 b.result = Poll::Ready(v.clone());
64 }
65 waker.wake();
66 }
67 }
68
69 pub fn reciever(&self) -> Reciever<T> {
70 Reciever(Rc::downgrade(&self.0))
71 }
72}
73
74pub fn channel<T: Clone + std::fmt::Debug>() -> Sender<T> {
75 let a = Sender::new();
76 assert!(a.0.as_ptr() as usize >= 0x200);
77 a
78}
79
80impl<T: Clone> PartialEq for Sender<T> {
81 fn eq(&self, other: &Self) -> bool {
82 Rc::ptr_eq(&self.0, &other.0)
83 }
84}
85
86impl<T: Clone> Eq for Sender<T> {}
87
88impl<T: Clone> std::hash::Hash for Sender<T> {
89 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
90 (self.0.as_ptr() as usize).hash(state);
92 }
93}
94
95#[derive(Clone)]
96pub enum MaybeReady<T: Clone> {
97 Ready(T),
98 Pending(Sender<T>),
99}
100
101impl<T: Clone + std::fmt::Debug> MaybeReady<T> {
102 pub fn pending() -> Self {
103 Self::Pending(channel())
104 }
105 pub fn ready(t: T) -> Self {
106 Self::Ready(t)
107 }
108 pub fn is_ready(&self) -> bool {
109 matches!(self, Self::Ready(_))
110 }
111 pub fn set(&mut self, t: T) {
112 if let Self::Pending(ref sd) = std::mem::replace(self, Self::Ready(t.clone())) {
113 sd.send(t.clone());
114 }
115 }
116 pub fn sender(&mut self, t: T) -> Option<Sender<T>> {
117 let res = if let Self::Pending(ref sd) = self {
118 Some(sd.clone())
119 } else { None };
120 *self = Self::Ready(t);
121 res
122 }
123 pub fn poll(&self) -> Poll<T> {
124 match self {
125 MaybeReady::Ready(a) => Poll::Ready(a.clone()),
126 MaybeReady::Pending(_) => Poll::Pending,
127 }
128 }
129 pub fn poll_opt(&self) -> Option<T> {
130 match self {
131 MaybeReady::Ready(a) => Some(a.clone()),
132 MaybeReady::Pending(_) => None,
133 }
134 }
135 pub async fn get(&self) -> T {
136 match self {
137 MaybeReady::Ready(a) => a.clone(),
138 MaybeReady::Pending(sender) => sender.clone().await,
139 }
140 }
141}
142
143#[cfg(test)]
144mod test {
145 use std::task::Poll;
146
147 use futures::StreamExt;
148
149 use crate::task;
150
151 use super::channel;
152
153 #[test]
154 fn test() {
155 let sd = channel::<usize>();
156 eprintln!("Creating handle2");
157 let mut rv2 = sd.reciever();
158 let handle2 = task::spawn(async move {
159 eprintln!(" handle2 0");
160 let a = rv2.next().await.unwrap();
161 eprintln!(" handle2 1");
162 let b = rv2.next().await.unwrap();
163 eprintln!(" handle2 2");
164 let c = rv2.next().await.unwrap();
165 eprintln!(" handle2 3");
166 a + b + c
167 });
168 eprintln!("Creating handle");
169 let mut rv = sd.reciever();
170 let handle = task::spawn(async move {
171 eprintln!(" handle 0");
172 let a = rv.next().await.unwrap();
173 eprintln!(" handle 1");
174 let b = rv.next().await.unwrap();
175 eprintln!(" handle 2");
176 a + b + handle2.await
177 });
178 assert!(!handle.is_ready());
179 eprintln!("Sending 1");
180 let _ = sd.send(1);
181 assert!(!handle.is_ready());
182 eprintln!("Sending 2");
183 let _ = sd.send(2);
184 assert!(!handle.is_ready());
185 eprintln!("Sending 3");
186 let _ = sd.send(3);
187 assert!(handle.is_ready());
188 assert!(handle.poll_rc_nocx() == Poll::Ready(9));
189 }
190
191 }
204