synthphonia/forward/bridge.rs
1
2use std::{cell::UnsafeCell, task::{Poll, Waker}};
3
4use futures::FutureExt;
5use itertools::Itertools;
6use simple_rc_async::sync::oneshot;
7use tokio::task::JoinHandle;
8
9use crate::{expr::{Expr, Expression}, info, utils::UnsafeCellExt};
10
11
12
13/// a bridge for interthread communication.
14pub struct Bridge(UnsafeCell<Vec<(JoinHandle<Expression>, oneshot::Sender<Expression>)>>);
15
16impl Default for Bridge {
17 /// A default constructor for the type.
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl Bridge {
24 /// Creates a new instance of Bridge by initializing an empty vector within an `UnsafeCell`.
25 pub fn new() -> Self {
26 Self(Vec::new().into())
27 }
28 /// Provides a mutable reference to the inner vector of tuples, each containing a `JoinHandle` and a `oneshot::Sender`, which are used for asynchronous computation and message passing, respectively.
29 fn inner(&self) -> &mut Vec<(JoinHandle<Expression>, oneshot::Sender<Expression>)> {
30 unsafe { self.0.as_mut() }
31 }
32 /// Waits for the completion of a synthesis task and returns a receiver for results.
33 ///
34 /// This method takes a `JoinHandle`, which represents a spawned asynchronous task that will output an `Expression`.
35 /// It creates a oneshot channel, which is used for sending an expression once the task completes.
36 /// The sender part of the channel is paired with the `JoinHandle` and added to the vector inside the `Bridge`.
37 /// The method returns the receiver part of the channel, allowing the caller to wait for and retrieve the result of the task once it's completed.
38 ///
39 pub fn wait(&self, handle: JoinHandle<Expression>) -> oneshot::Reciever<Expression> {
40 let rv = oneshot::channel();
41 self.inner().push((handle, rv.sender()));
42 rv
43 }
44 /// Aborts all ongoing synthesis tasks managed by this instance.
45 pub fn abort_all(&self) {
46 for (h, p) in self.inner() {
47 h.abort();
48 }
49 *self.inner() = Vec::new();
50 }
51 /// Checks and handles the status of ongoing tasks and their results.
52 pub fn check(&self) {
53 let vec = std::mem::take(self.inner());
54 let mut v = vec.into_iter().flat_map(|(mut h, s)| {
55 let mut cx = std::task::Context::from_waker(Waker::noop());
56 if let Poll::Ready(r) = h.poll_unpin(&mut cx) {
57 info!("Thread {} ended", h.id());
58 if let Ok(r) = r { let _ = s.send(r); }
59 None
60 } else { Some((h, s)) }
61 }).collect_vec();
62 self.inner().append(&mut v);
63 }
64}
65