synthphonia/forward/data/
substr.rs

1
2
3use std::{cell::UnsafeCell, collections::{hash_map, HashSet}, iter, ops::Range};
4
5use derive_more::{Deref, DerefMut};
6use futures::{SinkExt, StreamExt};
7use iset::IntervalMap;
8use itertools::{Either, Itertools};
9use simple_rc_async::sync::broadcast;
10
11use crate::{closure, expr::Expr, forward::executor::Executor, never, utils::{nested::{IntervalTreeN, NestedIntervalTree}, UnsafeCellExt}, value::Value};
12
13use super::size::EV;
14use ahash::AHashMap as HashMap;
15
16/// Term Dispatcher for substrings
17pub struct Data {
18    expected: &'static [&'static str],
19    found: IntervalTreeN,
20    event: IntervalTreeN,
21    senders: HashMap<Value, broadcast::Sender<Value>>,
22    size_limit: usize,
23    exceeded_size_limit: bool,
24}
25
26impl Data {
27    pub fn new(expected: Value, size_limit: usize) -> Option<UnsafeCell<Self>> {
28        if let Value::Str(e) = expected {
29            Some(Self {
30                expected: e,
31                found: IntervalTreeN::new(e),
32                event: IntervalTreeN::new(e),
33                senders: HashMap::new(),
34                size_limit,
35                exceeded_size_limit: false,
36            }.into())
37        } else { None }
38    }
39    pub fn expected_contains(&self, value: Value) -> bool {
40        if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
41            v.iter().cloned().zip(self.expected.iter().cloned()).all(|(a, b)| b.contains(a) && !a.is_empty())
42        } else { false }
43    }
44    
45    pub fn update(&mut self, value: Value, exec: &'static Executor) {
46        if exec.size() > self.size_limit  { return; }
47        if self.expected_contains(value) {
48            self.found.insert(value.to_str());
49
50            let mut senders = Vec::new();
51            for v in self.event.superstrings(value.to_str()) {
52                if let Some(sd) = self.senders.get(&v.into()) {
53                    senders.push(sd.clone());
54                }
55            }
56            for sd in senders {
57                sd.send(value);
58            }
59        }
60        
61    }
62
63    pub fn lookup_existing(&self, value: Value) -> impl Iterator<Item=Value> + '_ {
64        self.found.substrings(value.to_str()).map(|x| x.into())
65    }
66    
67    pub fn listen(&mut self, value: Value) -> Option<broadcast::Reciever<Value>> {
68        if !self.expected_contains(value) { return None }
69        match self.senders.entry(value) {
70            hash_map::Entry::Occupied(o) => {Some(o.get().reciever())}
71            hash_map::Entry::Vacant(v) => {
72                let sd = v.insert(broadcast::channel());
73                self.event.insert_first_occur(value.to_str());
74                Some(sd.reciever())
75            }
76        }
77
78    }
79
80    #[inline(always)]
81    pub async fn listen_for_each<T>(&mut self, value: Value, mut f: impl FnMut(Value) -> Option<T>) -> T {
82        if let Some(mut rv) = self.listen(value) {
83            for v in self.lookup_existing(value) {
84                if let Some(t) = f(v) { return t; }
85            }
86            loop {
87                if let Some(t) = f(rv.next().await.unwrap()) { return t; }
88            }
89        } else { never!() }
90    }
91}
92
93