synthphonia/forward/data/
substr.rs1
2
3use std::{cell::UnsafeCell, collections::{hash_map, HashSet}, iter, ops::Range};
4
5use derive_more::{Deref, DerefMut};
6use futures::{SinkExt, StreamExt};
7use iset::IntervalMap;
8use itertools::{Either, Itertools};
9use simple_rc_async::sync::broadcast;
10
11use crate::{closure, expr::Expr, forward::executor::Executor, never, utils::{nested::{IntervalTreeN, NestedIntervalTree}, UnsafeCellExt}, value::Value};
12
13use super::size::EV;
14use ahash::AHashMap as HashMap;
15
16pub struct Data {
18 expected: &'static [&'static str],
19 found: IntervalTreeN,
20 event: IntervalTreeN,
21 senders: HashMap<Value, broadcast::Sender<Value>>,
22 size_limit: usize,
23 exceeded_size_limit: bool,
24}
25
26impl Data {
27 pub fn new(expected: Value, size_limit: usize) -> Option<UnsafeCell<Self>> {
28 if let Value::Str(e) = expected {
29 Some(Self {
30 expected: e,
31 found: IntervalTreeN::new(e),
32 event: IntervalTreeN::new(e),
33 senders: HashMap::new(),
34 size_limit,
35 exceeded_size_limit: false,
36 }.into())
37 } else { None }
38 }
39 pub fn expected_contains(&self, value: Value) -> bool {
40 if let Ok(v) = TryInto::<&[&str]>::try_into(value) {
41 v.iter().cloned().zip(self.expected.iter().cloned()).all(|(a, b)| b.contains(a) && !a.is_empty())
42 } else { false }
43 }
44
45 pub fn update(&mut self, value: Value, exec: &'static Executor) {
46 if exec.size() > self.size_limit { return; }
47 if self.expected_contains(value) {
48 self.found.insert(value.to_str());
49
50 let mut senders = Vec::new();
51 for v in self.event.superstrings(value.to_str()) {
52 if let Some(sd) = self.senders.get(&v.into()) {
53 senders.push(sd.clone());
54 }
55 }
56 for sd in senders {
57 sd.send(value);
58 }
59 }
60
61 }
62
63 pub fn lookup_existing(&self, value: Value) -> impl Iterator<Item=Value> + '_ {
64 self.found.substrings(value.to_str()).map(|x| x.into())
65 }
66
67 pub fn listen(&mut self, value: Value) -> Option<broadcast::Reciever<Value>> {
68 if !self.expected_contains(value) { return None }
69 match self.senders.entry(value) {
70 hash_map::Entry::Occupied(o) => {Some(o.get().reciever())}
71 hash_map::Entry::Vacant(v) => {
72 let sd = v.insert(broadcast::channel());
73 self.event.insert_first_occur(value.to_str());
74 Some(sd.reciever())
75 }
76 }
77
78 }
79
80 #[inline(always)]
81 pub async fn listen_for_each<T>(&mut self, value: Value, mut f: impl FnMut(Value) -> Option<T>) -> T {
82 if let Some(mut rv) = self.listen(value) {
83 for v in self.lookup_existing(value) {
84 if let Some(t) = f(v) { return t; }
85 }
86 loop {
87 if let Some(t) = f(rv.next().await.unwrap()) { return t; }
88 }
89 } else { never!() }
90 }
91}
92
93