synthphonia/forward/data/
prefix.rs1
2
3use std::{cell::UnsafeCell, collections::{hash_map, HashSet}, iter, ops::Range};
4
5use derive_more::{Deref, DerefMut};
6use futures::{SinkExt, StreamExt};
7use iset::IntervalMap;
8use itertools::{Either, Itertools};
9use radix_trie::Trie;
10use simple_rc_async::sync::broadcast;
11use tokio::{runtime::Handle, sync::mpsc};
12
13use crate::{closure, debg2, expr::Expr, forward::executor::Executor, utils::{nested::RadixTrieN, UnsafeCellExt}, value::{self, Value}};
14
15use super::size::EV;
16pub type Indices = Vec<usize>;
17
18use ahash::AHashMap as HashMap;
19
20pub struct Data {
22 expected: &'static [&'static str],
23 found: RadixTrieN,
24 event: RadixTrieN,
25 senders: HashMap<Value, broadcast::Sender<Value>>,
26 size_limit: usize,
27}
28
29impl Data {
30 pub fn new(expected: Value, size_limit: usize) -> Option<UnsafeCell<Self>> {
31 if let Value::Str(e) = expected {
32 Some(Self {
33 expected: e,
34 found: RadixTrieN::new(e.len()),
35 event: RadixTrieN::new(e.len()),
36 senders: HashMap::new(),
37 size_limit,
38 }.into())
39 } else { None }
40 }
41
42 pub fn expected_contains(&self, value: Value) -> bool {
69 if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
70 v.iter().cloned().zip(self.expected.iter().cloned()).all(|(a, b)| b.contains(a))
71 } else { false }
72 }
73 pub fn update(&mut self, value: Value, exec: &'static Executor) {
74 if exec.size() > self.size_limit { return; }
75
76 if self.expected_contains(value) {
77 self.found.insert(value.to_str());
78 let mut senders = Vec::new();
79 for v in self.event.superfixes(value.to_str()) {
80 if let Some(sd) = self.senders.get(&v.into()) {
81 senders.push(sd.clone());
82 }
83 }
84 for sd in senders {
85 sd.send(value);
86 }
87 }
88 }
89
90 pub fn lookup_existing(&self, value: Value) -> impl Iterator<Item=Value> + '_ {
91 self.found.prefixes(value.to_str()).map(|x| x.into())
92 }
93
94 pub fn listen(&mut self, value: Value) -> broadcast::Reciever<Value> {
95 match self.senders.entry(value) {
96 hash_map::Entry::Occupied(o) => {o.get().reciever()}
97 hash_map::Entry::Vacant(v) => {
98 let sd = v.insert(broadcast::channel());
99 self.event.insert(value.to_str());
100 sd.reciever()
101 }
102 }
103 }
104
105 #[inline(always)]
106 pub async fn listen_for_each<T>(&mut self, value: Value, mut f: impl FnMut(Value) -> Option<T>) -> T {
107 for v in self.lookup_existing(value) {
108 if let Some(t) = f(v) { return t; }
109 }
110 let mut rv = self.listen(value);
111 loop {
112 if let Some(t) = f(rv.next().await.unwrap()) { return t; }
113 }
114 }
115}
116
117