synthphonia/forward/
bridge.rs

1
2use std::{cell::UnsafeCell, task::{Poll, Waker}};
3
4use futures::FutureExt;
5use itertools::Itertools;
6use simple_rc_async::sync::oneshot;
7use tokio::task::JoinHandle;
8
9use crate::{expr::{Expr, Expression}, info, utils::UnsafeCellExt};
10
11
12
13/// a bridge for interthread communication.
14pub struct Bridge(UnsafeCell<Vec<(JoinHandle<Expression>, oneshot::Sender<Expression>)>>);
15
16impl Default for Bridge {
17    /// A default constructor for the type. 
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl Bridge {
24    /// Creates a new instance of Bridge by initializing an empty vector within an `UnsafeCell`. 
25    pub fn new() -> Self {
26        Self(Vec::new().into())
27    }
28    /// Provides a mutable reference to the inner vector of tuples, each containing a `JoinHandle` and a `oneshot::Sender`, which are used for asynchronous computation and message passing, respectively. 
29    fn inner(&self) -> &mut Vec<(JoinHandle<Expression>, oneshot::Sender<Expression>)> {
30        unsafe { self.0.as_mut() }
31    }
32    /// Waits for the completion of a synthesis task and returns a receiver for results. 
33    /// 
34    /// This method takes a `JoinHandle`, which represents a spawned asynchronous task that will output an `Expression`. 
35    /// It creates a oneshot channel, which is used for sending an expression once the task completes. 
36    /// The sender part of the channel is paired with the `JoinHandle` and added to the vector inside the `Bridge`. 
37    /// The method returns the receiver part of the channel, allowing the caller to wait for and retrieve the result of the task once it's completed.
38    /// 
39    pub fn wait(&self, handle: JoinHandle<Expression>) -> oneshot::Reciever<Expression> {
40        let rv = oneshot::channel();
41        self.inner().push((handle, rv.sender()));
42        rv
43    }
44    /// Aborts all ongoing synthesis tasks managed by this instance. 
45    pub fn abort_all(&self) {
46        for (h, p) in self.inner() {
47            h.abort();
48        }
49        *self.inner() = Vec::new();
50    }
51    /// Checks and handles the status of ongoing tasks and their results. 
52    pub fn check(&self) {
53        let vec = std::mem::take(self.inner());
54        let mut v = vec.into_iter().flat_map(|(mut h, s)| {
55            let mut cx = std::task::Context::from_waker(Waker::noop());
56            if let Poll::Ready(r) = h.poll_unpin(&mut cx) { 
57                info!("Thread {} ended", h.id());
58                if let Ok(r) = r { let _ = s.send(r); }
59                None
60            } else { Some((h, s)) }
61        }).collect_vec();
62        self.inner().append(&mut v);
63    }
64}
65