synthphonia/forward/data/
prefix.rs

1
2
3use std::{cell::UnsafeCell, collections::{hash_map, HashSet}, iter, ops::Range};
4
5use derive_more::{Deref, DerefMut};
6use futures::{SinkExt, StreamExt};
7use iset::IntervalMap;
8use itertools::{Either, Itertools};
9use radix_trie::Trie;
10use simple_rc_async::sync::broadcast;
11use tokio::{runtime::Handle, sync::mpsc};
12
13use crate::{closure, debg2, expr::Expr, forward::executor::Executor, utils::{nested::RadixTrieN, UnsafeCellExt}, value::{self, Value}};
14
15use super::size::EV;
16pub type Indices = Vec<usize>;
17
18use ahash::AHashMap as HashMap;
19
20/// Prefix Term Dispatcher using Radix Trees
21pub struct Data {
22    expected: &'static [&'static str],
23    found: RadixTrieN,
24    event: RadixTrieN,
25    senders: HashMap<Value, broadcast::Sender<Value>>,
26    size_limit: usize,
27}
28
29impl Data {
30    pub fn new(expected: Value, size_limit: usize) -> Option<UnsafeCell<Self>> {
31        if let Value::Str(e) = expected {
32            Some(Self {
33                expected: e,
34                found: RadixTrieN::new(e.len()),
35                event: RadixTrieN::new(e.len()),
36                senders: HashMap::new(),
37                size_limit,
38            }.into())
39        } else { None }
40    }
41    
42    // fn to_ranges(&self, value: Value) -> Option<Vec<Vec<Range<usize>>>> {
43    //     if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
44    //         assert!(v.len() == self.expected.len());
45    //         let mut result = Vec::with_capacity(v.len());
46    //         for (&e, &x) in self.expected.iter().zip(v.iter()) {
47    //             if x.is_empty() {  result.push(vec![0..0]); continue; }
48    //             let r = e.match_indices(x).map(|(i, _)| i..(i+x.len())).collect_vec();
49    //             if r.is_empty() { return None; }
50    //             result.push(r)
51    //         };
52    //         Some(result)
53    //     } else { None }
54    // }
55    // fn to_range(&self, value: Value) -> Option<Vec<Range<usize>>> {
56    //     if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
57    //         assert!(v.len() == self.expected.len());
58    //         let mut result = Vec::with_capacity(v.len());
59    //         for (&e, &x) in self.expected.iter().zip(v.iter()) {
60    //             if x.is_empty() { result.push(0..0); continue; }
61    //             if let Some((i, _)) = e.match_indices(x).next() {
62    //                 result.push(i..(i+x.len()))
63    //             } else { return None; }
64    //         };
65    //         Some(result)
66    //     } else { None }
67    // }
68    pub fn expected_contains(&self, value: Value) -> bool {
69        if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
70            v.iter().cloned().zip(self.expected.iter().cloned()).all(|(a, b)| b.contains(a))
71        } else { false }
72    }
73    pub fn update(&mut self, value: Value, exec: &'static Executor) {
74        if exec.size() > self.size_limit { return; }
75        
76        if self.expected_contains(value) {
77            self.found.insert(value.to_str());
78            let mut senders = Vec::new();
79            for v in self.event.superfixes(value.to_str()) {
80                if let Some(sd) = self.senders.get(&v.into()) {
81                    senders.push(sd.clone());
82                }
83            }
84            for sd in senders {
85                sd.send(value);
86            }
87        }
88    }
89
90    pub fn lookup_existing(&self, value: Value) -> impl Iterator<Item=Value> + '_ {
91        self.found.prefixes(value.to_str()).map(|x| x.into())
92    }
93    
94    pub fn listen(&mut self, value: Value) -> broadcast::Reciever<Value> {
95        match self.senders.entry(value) {
96            hash_map::Entry::Occupied(o) => {o.get().reciever()}
97            hash_map::Entry::Vacant(v) => {
98                let sd = v.insert(broadcast::channel());
99                self.event.insert(value.to_str());
100                sd.reciever()
101            }
102        }
103    }
104    
105    #[inline(always)]
106    pub async fn listen_for_each<T>(&mut self, value: Value, mut f: impl FnMut(Value) -> Option<T>) -> T {
107        for v in self.lookup_existing(value) {
108            if let Some(t) = f(v) { return t; }
109        }
110        let mut rv = self.listen(value);
111        loop {
112            if let Some(t) = f(rv.next().await.unwrap()) { return t; }
113        }
114    }
115}
116
117