hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::panic::RefUnwindSafe;
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11use bytes::Bytes;
12use colored::Colorize;
13use dfir_rs::scheduled::graph::Dfir;
14use futures::{FutureExt, Stream, StreamExt};
15use libloading::Library;
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use tempfile::TempPath;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22use super::runtime::SimHook;
23use crate::compile::deploy::ConnectableAsync;
24use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
25use crate::location::dynamic::LocationId;
26use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
27
28/// A handle to a compiled Hydro simulation, which can be instantiated and run.
29pub struct CompiledSim {
30    pub(super) _path: TempPath,
31    pub(super) lib: Library,
32    pub(super) external_ports: Vec<usize>,
33    pub(super) external_registered: HashMap<usize, usize>,
34}
35
36#[sealed::sealed]
37/// A trait implemented by closures that can instantiate a compiled simulation.
38///
39/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
40pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
41#[sealed::sealed]
42impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
43
44fn null_handler(_args: fmt::Arguments) {}
45
46fn println_handler(args: fmt::Arguments) {
47    println!("{}", args);
48}
49
50fn eprintln_handler(args: fmt::Arguments) {
51    eprintln!("{}", args);
52}
53
54type SimLoaded<'a> = libloading::Symbol<
55    'a,
56    unsafe extern "Rust" fn(
57        bool,
58        HashMap<usize, UnboundedSender<Bytes>>,
59        HashMap<usize, UnboundedReceiverStream<Bytes>>,
60        fn(fmt::Arguments<'_>),
61        fn(fmt::Arguments<'_>),
62    ) -> (
63        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
64        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65        HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
66    ),
67>;
68
69impl CompiledSim {
70    /// Executes the given closure with a single instance of the compiled simulation.
71    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
72        self.with_instantiator(|instantiator| thunk(instantiator()), true)
73    }
74
75    /// Executes the given closure with an [`Instantiator`], which can be called to create
76    /// independent instances of the simulation. This is useful for fuzzing, where we need to
77    /// re-execute the simulation several times with different decisions.
78    ///
79    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
80    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
81    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
82    pub fn with_instantiator<T>(
83        &self,
84        thunk: impl FnOnce(&dyn Instantiator) -> T,
85        always_log: bool,
86    ) -> T {
87        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
88        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
89        thunk(
90            &(|| CompiledSimInstance {
91                func: func.clone(),
92                remaining_ports: self.external_ports.iter().cloned().collect(),
93                external_registered: self.external_registered.clone(),
94                input_ports: HashMap::new(),
95                output_ports: HashMap::new(),
96                log,
97            }),
98        )
99    }
100
101    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
102    /// closure will be repeatedly executed with instances of the Hydro program where the
103    /// batching boundaries, order of messages, and retries are varied.
104    ///
105    /// During development, you should run the test that invokes this function with the `cargo sim`
106    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
107    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
108    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
109    /// be executed, and if no reproducer is found a small number of random executions will be
110    /// performed.
111    pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
112        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
113            .elements()
114            .into_iter()
115            .find(|e| {
116                !e.fn_name.starts_with("hydro_lang::sim::compiled")
117                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
118                    && !e.fn_name.starts_with("fuzz<")
119            })
120            .unwrap();
121
122        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
123        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
124
125        let caller_fuzz_repro_path = repro_folder
126            .join(caller_fn.fn_name.replace("::", "__"))
127            .with_extension("bin");
128
129        if std::env::var("BOLERO_FUZZER").is_ok() {
130            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
131            std::fs::create_dir_all(&corpus_dir).unwrap();
132            let libfuzzer_args = format!(
133                "{} {} -artifact_prefix={}/ -handle_abrt=0",
134                corpus_dir.to_str().unwrap(),
135                corpus_dir.to_str().unwrap(),
136                corpus_dir.to_str().unwrap(),
137            );
138
139            std::fs::create_dir_all(&repro_folder).unwrap();
140
141            unsafe {
142                std::env::set_var(
143                    "BOLERO_FAILURE_OUTPUT",
144                    caller_fuzz_repro_path.to_str().unwrap(),
145                );
146
147                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
148            }
149
150            self.with_instantiator(
151                |instantiator| {
152                    bolero::test(bolero::TargetLocation {
153                        package_name: "",
154                        manifest_dir: "",
155                        module_path: "",
156                        file: "",
157                        line: 0,
158                        item_path: "<unknown>::__bolero_item_path__",
159                        test_name: None,
160                    })
161                    .run_with_replay(move |is_replay| {
162                        let mut instance = instantiator();
163
164                        if instance.log {
165                            eprintln!(
166                                "{}",
167                                "\n==== New Simulation Instance ===="
168                                    .color(colored::Color::Cyan)
169                                    .bold()
170                            );
171                        }
172
173                        if is_replay {
174                            instance.log = true;
175                        }
176
177                        tokio::runtime::Builder::new_current_thread()
178                            .build()
179                            .unwrap()
180                            .block_on(async {
181                                let local_set = tokio::task::LocalSet::new();
182                                local_set.run_until(thunk(instance)).await
183                            })
184                    })
185                },
186                false,
187            );
188        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
189            self.fuzz_repro(existing_bytes, thunk);
190        } else {
191            eprintln!(
192                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
193                caller_fuzz_repro_path.display()
194            );
195            self.with_instantiator(
196                |instantiator| {
197                    bolero::test(bolero::TargetLocation {
198                        package_name: "",
199                        manifest_dir: "",
200                        module_path: "",
201                        file: ".",
202                        line: 0,
203                        item_path: "<unknown>::__bolero_item_path__",
204                        test_name: None,
205                    })
206                    .with_iterations(8192)
207                    .run(move || {
208                        let instance = instantiator();
209                        tokio::runtime::Builder::new_current_thread()
210                            .build()
211                            .unwrap()
212                            .block_on(async {
213                                let local_set = tokio::task::LocalSet::new();
214                                local_set.run_until(thunk(instance)).await
215                            })
216                    })
217                },
218                false,
219            );
220        }
221    }
222
223    /// Executes the given closure with a single instance of the compiled simulation, using the
224    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
225    /// failure found during fuzzing.
226    pub fn fuzz_repro<'a>(
227        &'a self,
228        bytes: Vec<u8>,
229        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
230    ) {
231        self.with_instance(|instance| {
232            bolero::bolero_engine::any::scope::with(
233                Box::new(bolero::bolero_engine::driver::object::Object(
234                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
235                )),
236                || {
237                    tokio::runtime::Builder::new_current_thread()
238                        .build()
239                        .unwrap()
240                        .block_on(async {
241                            let local_set = tokio::task::LocalSet::new();
242                            local_set.run_until(thunk(instance)).await
243                        })
244                },
245            )
246        });
247    }
248
249    /// Exhaustively searches all possible executions of the simulation. The provided
250    /// closure will be repeatedly executed with instances of the Hydro program where the
251    /// batching boundaries, order of messages, and retries are varied.
252    ///
253    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
254    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
255    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
256    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
257    ///
258    /// Returns the number of distinct executions explored.
259    pub fn exhaustive<'a>(
260        &'a self,
261        mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
262    ) -> usize {
263        if std::env::var("BOLERO_FUZZER").is_ok() {
264            eprintln!(
265                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
266            );
267            std::process::abort();
268        }
269
270        let mut count = 0;
271        let count_mut = &mut count;
272
273        self.with_instantiator(
274            |instantiator| {
275                bolero::test(bolero::TargetLocation {
276                    package_name: "",
277                    manifest_dir: "",
278                    module_path: "",
279                    file: "",
280                    line: 0,
281                    item_path: "<unknown>::__bolero_item_path__",
282                    test_name: None,
283                })
284                .exhaustive()
285                .run_with_replay(move |is_replay| {
286                    *count_mut += 1;
287
288                    let mut instance = instantiator();
289                    if instance.log {
290                        eprintln!(
291                            "{}",
292                            "\n==== New Simulation Instance ===="
293                                .color(colored::Color::Cyan)
294                                .bold()
295                        );
296                    }
297
298                    if is_replay {
299                        instance.log = true;
300                    }
301
302                    tokio::runtime::Builder::new_current_thread()
303                        .build()
304                        .unwrap()
305                        .block_on(async {
306                            let local_set = tokio::task::LocalSet::new();
307                            local_set.run_until(thunk(instance)).await;
308                        })
309                })
310            },
311            false,
312        );
313
314        count
315    }
316}
317
318/// A single instance of a compiled Hydro simulation, which provides methods to interactively
319/// execute the simulation, feed inputs, and receive outputs.
320pub struct CompiledSimInstance<'a> {
321    func: SimLoaded<'a>,
322    remaining_ports: HashSet<usize>,
323    external_registered: HashMap<usize, usize>,
324    output_ports: HashMap<usize, UnboundedSender<Bytes>>,
325    input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
326    log: bool,
327}
328
329impl<'a> CompiledSimInstance<'a> {
330    #[deprecated(note = "Use `connect` instead")]
331    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
332    /// given input port, and returns a closure that can be used to send messages to it.
333    pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
334        &mut self,
335        port: &ExternalBincodeSink<T, M, O, R>,
336    ) -> SimSender<T, O, R> {
337        self.connect(port)
338    }
339
340    #[deprecated(note = "Use `connect` instead")]
341    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
342    /// given output port, and returns a stream that can be used to receive messages from it.
343    pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
344        &mut self,
345        port: &ExternalBincodeStream<T, O, R>,
346    ) -> SimReceiver<'a, T, O, R> {
347        self.connect(port)
348    }
349
350    /// Establishes a connection to the given input or output port, returning either a
351    /// [`SimSender`] (for input ports) or a stream (for output ports). This should be invoked
352    /// before calling [`Self::launch`], and should only be invoked once per port.
353    pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
354        &'b mut self,
355        port: P,
356    ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
357        let mut pinned = std::pin::pin!(port.connect(self));
358        if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
359            v
360        } else {
361            panic!("Connect impl should not have used any async operations");
362        }
363    }
364
365    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
366    /// be invoked after connecting all inputs and outputs, but before receiving any messages.
367    pub fn launch(self) {
368        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
369    }
370
371    /// Returns a future that schedules simulation with the given logger for reporting the
372    /// simulation trace.
373    ///
374    /// See [`Self::launch`] for more details.
375    pub fn schedule_with_logger<W: std::io::Write>(
376        self,
377        log_writer: W,
378    ) -> impl use<W> + Future<Output = ()> {
379        self.schedule_with_maybe_logger(Some(log_writer))
380    }
381
382    fn schedule_with_maybe_logger<W: std::io::Write>(
383        self,
384        log_override: Option<W>,
385    ) -> impl use<W> + Future<Output = ()> {
386        if !self.remaining_ports.is_empty() {
387            panic!(
388                "Cannot launch DFIR because some of the inputs / outputs have not been connected."
389            )
390        }
391
392        let (async_dfirs, tick_dfirs, hooks) = unsafe {
393            (self.func)(
394                colored::control::SHOULD_COLORIZE.should_colorize(),
395                self.output_ports,
396                self.input_ports,
397                if self.log {
398                    println_handler
399                } else {
400                    null_handler
401                },
402                if self.log {
403                    eprintln_handler
404                } else {
405                    null_handler
406                },
407            )
408        };
409        let mut launched = LaunchedSim {
410            async_dfirs: async_dfirs
411                .into_iter()
412                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
413                .collect(),
414            possibly_ready_ticks: vec![],
415            not_ready_ticks: tick_dfirs
416                .into_iter()
417                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
418                .collect(),
419            hooks: hooks
420                .into_iter()
421                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
422                .collect(),
423            log: if self.log {
424                if let Some(w) = log_override {
425                    LogKind::Custom(w)
426                } else {
427                    LogKind::Stderr
428                }
429            } else {
430                LogKind::Null
431            },
432        };
433
434        async move { launched.scheduler().await }
435    }
436}
437
438/// A receiver for an external bincode stream in a simulation.
439pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
440    Pin<Box<dyn Stream<Item = T> + 'a>>,
441    PhantomData<(O, R)>,
442);
443
444impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
445    /// Asserts that the stream has ended and no more messages can possibly arrive.
446    pub async fn assert_no_more(mut self)
447    where
448        T: std::fmt::Debug,
449    {
450        if let Some(next) = self.0.next().await {
451            panic!("Stream yielded unexpected message: {:?}", next);
452        }
453    }
454}
455
456impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
457    /// Receives the next message from the external bincode stream. This will wait until a message
458    /// is available, or return `None` if no more messages can possibly arrive.
459    pub async fn next(&mut self) -> Option<T> {
460        self.0.next().await
461    }
462
463    /// Collects all remaining messages from the external bincode stream into a collection. This
464    /// will wait until no more messages can possibly arrive.
465    pub async fn collect<C: Default + Extend<T>>(self) -> C {
466        self.0.collect().await
467    }
468
469    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
470    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
471    pub async fn assert_yields(&mut self, expected: impl IntoIterator<Item = T>)
472    where
473        T: std::fmt::Debug + PartialEq,
474    {
475        let mut expected: VecDeque<T> = expected.into_iter().collect();
476
477        while !expected.is_empty() {
478            if let Some(next) = self.next().await {
479                assert_eq!(next, expected.pop_front().unwrap());
480            } else {
481                panic!("Stream ended early, still expected: {:?}", expected);
482            }
483        }
484    }
485
486    /// Asserts that the stream yields only the expected sequence of messages, in order,
487    /// and then ends.
488    pub async fn assert_yields_only(mut self, expected: impl IntoIterator<Item = T>)
489    where
490        T: std::fmt::Debug + PartialEq,
491    {
492        self.assert_yields(expected).await;
493        self.assert_no_more().await;
494    }
495}
496
497impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
498    /// Collects all remaining messages from the external bincode stream into a collection,
499    /// sorting them. This will wait until no more messages can possibly arrive.
500    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
501    where
502        T: Ord,
503    {
504        let mut collected: C = self.0.collect().await;
505        collected.as_mut().sort();
506        collected
507    }
508
509    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
510    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
511    pub async fn assert_yields_unordered(&mut self, expected: impl IntoIterator<Item = T>)
512    where
513        T: std::fmt::Debug + PartialEq,
514    {
515        let mut expected: Vec<T> = expected.into_iter().collect();
516
517        while !expected.is_empty() {
518            if let Some(next) = self.0.next().await {
519                let idx = expected.iter().enumerate().find(|(_, e)| *e == &next);
520                if let Some((i, _)) = idx {
521                    expected.swap_remove(i);
522                } else {
523                    panic!("Stream yielded unexpected message: {:?}", next);
524                }
525            } else {
526                panic!("Stream ended early, still expected: {:?}", expected);
527            }
528        }
529    }
530
531    /// Asserts that the stream yields only the expected sequence of messages, in some order,
532    /// and then ends.
533    pub async fn assert_yields_only_unordered(mut self, expected: impl IntoIterator<Item = T>)
534    where
535        T: std::fmt::Debug + PartialEq,
536    {
537        self.assert_yields_unordered(expected).await;
538        self.assert_no_more().await;
539    }
540}
541
542impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
543    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
544{
545    type Output = SimReceiver<'a, T, O, R>;
546
547    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
548        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
549
550        assert!(ctx.remaining_ports.remove(looked_up));
551        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
552        ctx.output_ports.insert(*looked_up, sink);
553
554        SimReceiver(
555            Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
556            PhantomData,
557        )
558    }
559}
560
561/// A sender to an external bincode sink in a simulation.
562pub struct SimSender<T, O: Ordering, R: Retries>(
563    Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
564    PhantomData<(O, R)>,
565);
566impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
567    /// Sends a message to the external bincode sink. The message will be asynchronously processed
568    /// as part of the simulation.
569    pub fn send(&self, t: T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
570        (self.0)(t)
571    }
572
573    /// Sends several messages to the external bincode sink. The messages will be asynchronously
574    /// processed as part of the simulation.
575    pub fn send_many<I: IntoIterator<Item = T>>(
576        &self,
577        iter: I,
578    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
579        for t in iter {
580            (self.0)(t)?;
581        }
582        Ok(())
583    }
584}
585
586impl<T> SimSender<T, NoOrder, ExactlyOnce> {
587    /// Sends several messages to the external bincode sink. The messages will be asynchronously
588    /// processed as part of the simulation, in non-determinstic order.
589    pub fn send_many_unordered<I: IntoIterator<Item = T>>(
590        &self,
591        iter: I,
592    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
593        for t in iter {
594            (self.0)(t)?;
595        }
596        Ok(())
597    }
598}
599
600impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
601    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
602{
603    type Output = SimSender<T, O, R>;
604
605    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
606        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
607
608        assert!(ctx.remaining_ports.remove(looked_up));
609        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
610        ctx.input_ports.insert(*looked_up, source);
611        SimSender(
612            Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
613            PhantomData,
614        )
615    }
616}
617
618enum LogKind<W: std::io::Write> {
619    Null,
620    Stderr,
621    Custom(W),
622}
623
624// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
625impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
626    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
627        match self {
628            LogKind::Null => Ok(()),
629            LogKind::Stderr => {
630                eprint!("{}", s);
631                Ok(())
632            }
633            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
634        }
635    }
636}
637
638type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
639
640/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
641/// about scheduling ticks and choices for non-deterministic operators like batch.
642struct LaunchedSim<W: std::io::Write> {
643    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
644    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
645    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
646    hooks: Hooks,
647    log: LogKind<W>,
648}
649
650impl<W: std::io::Write> LaunchedSim<W> {
651    async fn scheduler(&mut self) {
652        loop {
653            tokio::task::yield_now().await;
654            let mut any_made_progress = false;
655            for (loc, c_id, dfir) in &mut self.async_dfirs {
656                if dfir.run_tick().await {
657                    any_made_progress = true;
658                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
659                        .not_ready_ticks
660                        .drain(..)
661                        .partition(|(tick_loc, tick_c_id, _)| {
662                            let LocationId::Tick(_, outer) = tick_loc else {
663                                unreachable!()
664                            };
665                            outer.as_ref() == loc && tick_c_id == c_id
666                        });
667
668                    self.possibly_ready_ticks.extend(now_ready);
669                    self.not_ready_ticks.extend(still_not_ready);
670                }
671            }
672
673            if any_made_progress {
674                continue;
675            } else {
676                use bolero::generator::*;
677
678                let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
679                    .possibly_ready_ticks
680                    .drain(..)
681                    .partition(|(name, cid, _)| {
682                        self.hooks
683                            .get(&(name.clone(), *cid))
684                            .unwrap()
685                            .iter()
686                            .any(|hook| {
687                                hook.current_decision().unwrap_or(false)
688                                    || hook.can_make_nontrivial_decision()
689                            })
690                    });
691
692                self.possibly_ready_ticks = ready;
693                self.not_ready_ticks.append(&mut not_ready);
694
695                if self.possibly_ready_ticks.is_empty() {
696                    break;
697                } else {
698                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
699                    let mut removed = self.possibly_ready_ticks.remove(next_tick);
700
701                    match &mut self.log {
702                        LogKind::Null => {}
703                        LogKind::Stderr => {
704                            if let Some(cid) = &removed.1 {
705                                eprintln!(
706                                    "\n{}",
707                                    format!("Running Tick (Cluster Member {})", cid)
708                                        .color(colored::Color::Magenta)
709                                        .bold()
710                                )
711                            } else {
712                                eprintln!(
713                                    "\n{}",
714                                    "Running Tick".color(colored::Color::Magenta).bold()
715                                )
716                            }
717                        }
718                        LogKind::Custom(writer) => {
719                            writeln!(
720                                writer,
721                                "\n{}",
722                                "Running Tick".color(colored::Color::Magenta).bold()
723                            )
724                            .unwrap();
725                        }
726                    }
727
728                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
729                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
730                        write.write_str(" ")
731                    };
732
733                    let mut tick_decision_writer =
734                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
735                            inserter: &mut asterisk_indenter,
736                        });
737
738                    let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
739                    let mut remaining_decision_count = hooks.len();
740                    let mut made_nontrivial_decision = false;
741
742                    bolero_generator::any::scope::borrow_with(|driver| {
743                        // first, scan manual decisions
744                        hooks.iter_mut().for_each(|hook| {
745                            if let Some(is_nontrivial) = hook.current_decision() {
746                                made_nontrivial_decision |= is_nontrivial;
747                                remaining_decision_count -= 1;
748                            } else if !hook.can_make_nontrivial_decision() {
749                                // if no nontrivial decision is possible, make a trivial one
750                                // (we need to do this in the first pass to force nontrivial decisions
751                                // on the remaining hooks)
752                                hook.autonomous_decision(driver, false);
753                                remaining_decision_count -= 1;
754                            }
755                        });
756
757                        hooks.iter_mut().for_each(|hook| {
758                            if hook.current_decision().is_none() {
759                                made_nontrivial_decision |= hook.autonomous_decision(
760                                    driver,
761                                    !made_nontrivial_decision && remaining_decision_count == 1,
762                                );
763                                remaining_decision_count -= 1;
764                            }
765
766                            hook.release_decision(&mut tick_decision_writer);
767                        });
768                    });
769
770                    assert!(removed.2.run_tick().await);
771                    self.possibly_ready_ticks.push(removed);
772                }
773            }
774        }
775    }
776}