1use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::panic::RefUnwindSafe;
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11use bytes::Bytes;
12use colored::Colorize;
13use dfir_rs::scheduled::graph::Dfir;
14use futures::{FutureExt, Stream, StreamExt};
15use libloading::Library;
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use tempfile::TempPath;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22use super::runtime::SimHook;
23use crate::compile::deploy::ConnectableAsync;
24use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
25use crate::location::dynamic::LocationId;
26use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
27
28pub struct CompiledSim {
30 pub(super) _path: TempPath,
31 pub(super) lib: Library,
32 pub(super) external_ports: Vec<usize>,
33 pub(super) external_registered: HashMap<usize, usize>,
34}
35
36#[sealed::sealed]
37pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
41#[sealed::sealed]
42impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
43
44fn null_handler(_args: fmt::Arguments) {}
45
46fn println_handler(args: fmt::Arguments) {
47 println!("{}", args);
48}
49
50fn eprintln_handler(args: fmt::Arguments) {
51 eprintln!("{}", args);
52}
53
54type SimLoaded<'a> = libloading::Symbol<
55 'a,
56 unsafe extern "Rust" fn(
57 bool,
58 HashMap<usize, UnboundedSender<Bytes>>,
59 HashMap<usize, UnboundedReceiverStream<Bytes>>,
60 fn(fmt::Arguments<'_>),
61 fn(fmt::Arguments<'_>),
62 ) -> (
63 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
64 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65 HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
66 ),
67>;
68
69impl CompiledSim {
70 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
72 self.with_instantiator(|instantiator| thunk(instantiator()), true)
73 }
74
75 pub fn with_instantiator<T>(
83 &self,
84 thunk: impl FnOnce(&dyn Instantiator) -> T,
85 always_log: bool,
86 ) -> T {
87 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
88 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
89 thunk(
90 &(|| CompiledSimInstance {
91 func: func.clone(),
92 remaining_ports: self.external_ports.iter().cloned().collect(),
93 external_registered: self.external_registered.clone(),
94 input_ports: HashMap::new(),
95 output_ports: HashMap::new(),
96 log,
97 }),
98 )
99 }
100
101 pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
112 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
113 .elements()
114 .into_iter()
115 .find(|e| {
116 !e.fn_name.starts_with("hydro_lang::sim::compiled")
117 && !e.fn_name.starts_with("hydro_lang::sim::flow")
118 && !e.fn_name.starts_with("fuzz<")
119 })
120 .unwrap();
121
122 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
123 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
124
125 let caller_fuzz_repro_path = repro_folder
126 .join(caller_fn.fn_name.replace("::", "__"))
127 .with_extension("bin");
128
129 if std::env::var("BOLERO_FUZZER").is_ok() {
130 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
131 std::fs::create_dir_all(&corpus_dir).unwrap();
132 let libfuzzer_args = format!(
133 "{} {} -artifact_prefix={}/ -handle_abrt=0",
134 corpus_dir.to_str().unwrap(),
135 corpus_dir.to_str().unwrap(),
136 corpus_dir.to_str().unwrap(),
137 );
138
139 std::fs::create_dir_all(&repro_folder).unwrap();
140
141 unsafe {
142 std::env::set_var(
143 "BOLERO_FAILURE_OUTPUT",
144 caller_fuzz_repro_path.to_str().unwrap(),
145 );
146
147 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
148 }
149
150 self.with_instantiator(
151 |instantiator| {
152 bolero::test(bolero::TargetLocation {
153 package_name: "",
154 manifest_dir: "",
155 module_path: "",
156 file: "",
157 line: 0,
158 item_path: "<unknown>::__bolero_item_path__",
159 test_name: None,
160 })
161 .run_with_replay(move |is_replay| {
162 let mut instance = instantiator();
163
164 if instance.log {
165 eprintln!(
166 "{}",
167 "\n==== New Simulation Instance ===="
168 .color(colored::Color::Cyan)
169 .bold()
170 );
171 }
172
173 if is_replay {
174 instance.log = true;
175 }
176
177 tokio::runtime::Builder::new_current_thread()
178 .build()
179 .unwrap()
180 .block_on(async {
181 let local_set = tokio::task::LocalSet::new();
182 local_set.run_until(thunk(instance)).await
183 })
184 })
185 },
186 false,
187 );
188 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
189 self.fuzz_repro(existing_bytes, thunk);
190 } else {
191 eprintln!(
192 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
193 caller_fuzz_repro_path.display()
194 );
195 self.with_instantiator(
196 |instantiator| {
197 bolero::test(bolero::TargetLocation {
198 package_name: "",
199 manifest_dir: "",
200 module_path: "",
201 file: ".",
202 line: 0,
203 item_path: "<unknown>::__bolero_item_path__",
204 test_name: None,
205 })
206 .with_iterations(8192)
207 .run(move || {
208 let instance = instantiator();
209 tokio::runtime::Builder::new_current_thread()
210 .build()
211 .unwrap()
212 .block_on(async {
213 let local_set = tokio::task::LocalSet::new();
214 local_set.run_until(thunk(instance)).await
215 })
216 })
217 },
218 false,
219 );
220 }
221 }
222
223 pub fn fuzz_repro<'a>(
227 &'a self,
228 bytes: Vec<u8>,
229 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
230 ) {
231 self.with_instance(|instance| {
232 bolero::bolero_engine::any::scope::with(
233 Box::new(bolero::bolero_engine::driver::object::Object(
234 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
235 )),
236 || {
237 tokio::runtime::Builder::new_current_thread()
238 .build()
239 .unwrap()
240 .block_on(async {
241 let local_set = tokio::task::LocalSet::new();
242 local_set.run_until(thunk(instance)).await
243 })
244 },
245 )
246 });
247 }
248
249 pub fn exhaustive<'a>(
260 &'a self,
261 mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
262 ) -> usize {
263 if std::env::var("BOLERO_FUZZER").is_ok() {
264 eprintln!(
265 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
266 );
267 std::process::abort();
268 }
269
270 let mut count = 0;
271 let count_mut = &mut count;
272
273 self.with_instantiator(
274 |instantiator| {
275 bolero::test(bolero::TargetLocation {
276 package_name: "",
277 manifest_dir: "",
278 module_path: "",
279 file: "",
280 line: 0,
281 item_path: "<unknown>::__bolero_item_path__",
282 test_name: None,
283 })
284 .exhaustive()
285 .run_with_replay(move |is_replay| {
286 *count_mut += 1;
287
288 let mut instance = instantiator();
289 if instance.log {
290 eprintln!(
291 "{}",
292 "\n==== New Simulation Instance ===="
293 .color(colored::Color::Cyan)
294 .bold()
295 );
296 }
297
298 if is_replay {
299 instance.log = true;
300 }
301
302 tokio::runtime::Builder::new_current_thread()
303 .build()
304 .unwrap()
305 .block_on(async {
306 let local_set = tokio::task::LocalSet::new();
307 local_set.run_until(thunk(instance)).await;
308 })
309 })
310 },
311 false,
312 );
313
314 count
315 }
316}
317
318pub struct CompiledSimInstance<'a> {
321 func: SimLoaded<'a>,
322 remaining_ports: HashSet<usize>,
323 external_registered: HashMap<usize, usize>,
324 output_ports: HashMap<usize, UnboundedSender<Bytes>>,
325 input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
326 log: bool,
327}
328
329impl<'a> CompiledSimInstance<'a> {
330 #[deprecated(note = "Use `connect` instead")]
331 pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
334 &mut self,
335 port: &ExternalBincodeSink<T, M, O, R>,
336 ) -> SimSender<T, O, R> {
337 self.connect(port)
338 }
339
340 #[deprecated(note = "Use `connect` instead")]
341 pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
344 &mut self,
345 port: &ExternalBincodeStream<T, O, R>,
346 ) -> SimReceiver<'a, T, O, R> {
347 self.connect(port)
348 }
349
350 pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
354 &'b mut self,
355 port: P,
356 ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
357 let mut pinned = std::pin::pin!(port.connect(self));
358 if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
359 v
360 } else {
361 panic!("Connect impl should not have used any async operations");
362 }
363 }
364
365 pub fn launch(self) {
368 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
369 }
370
371 pub fn schedule_with_logger<W: std::io::Write>(
376 self,
377 log_writer: W,
378 ) -> impl use<W> + Future<Output = ()> {
379 self.schedule_with_maybe_logger(Some(log_writer))
380 }
381
382 fn schedule_with_maybe_logger<W: std::io::Write>(
383 self,
384 log_override: Option<W>,
385 ) -> impl use<W> + Future<Output = ()> {
386 if !self.remaining_ports.is_empty() {
387 panic!(
388 "Cannot launch DFIR because some of the inputs / outputs have not been connected."
389 )
390 }
391
392 let (async_dfirs, tick_dfirs, hooks) = unsafe {
393 (self.func)(
394 colored::control::SHOULD_COLORIZE.should_colorize(),
395 self.output_ports,
396 self.input_ports,
397 if self.log {
398 println_handler
399 } else {
400 null_handler
401 },
402 if self.log {
403 eprintln_handler
404 } else {
405 null_handler
406 },
407 )
408 };
409 let mut launched = LaunchedSim {
410 async_dfirs: async_dfirs
411 .into_iter()
412 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
413 .collect(),
414 possibly_ready_ticks: vec![],
415 not_ready_ticks: tick_dfirs
416 .into_iter()
417 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
418 .collect(),
419 hooks: hooks
420 .into_iter()
421 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
422 .collect(),
423 log: if self.log {
424 if let Some(w) = log_override {
425 LogKind::Custom(w)
426 } else {
427 LogKind::Stderr
428 }
429 } else {
430 LogKind::Null
431 },
432 };
433
434 async move { launched.scheduler().await }
435 }
436}
437
438pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
440 Pin<Box<dyn Stream<Item = T> + 'a>>,
441 PhantomData<(O, R)>,
442);
443
444impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
445 pub async fn assert_no_more(mut self)
447 where
448 T: std::fmt::Debug,
449 {
450 if let Some(next) = self.0.next().await {
451 panic!("Stream yielded unexpected message: {:?}", next);
452 }
453 }
454}
455
456impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
457 pub async fn next(&mut self) -> Option<T> {
460 self.0.next().await
461 }
462
463 pub async fn collect<C: Default + Extend<T>>(self) -> C {
466 self.0.collect().await
467 }
468
469 pub async fn assert_yields(&mut self, expected: impl IntoIterator<Item = T>)
472 where
473 T: std::fmt::Debug + PartialEq,
474 {
475 let mut expected: VecDeque<T> = expected.into_iter().collect();
476
477 while !expected.is_empty() {
478 if let Some(next) = self.next().await {
479 assert_eq!(next, expected.pop_front().unwrap());
480 } else {
481 panic!("Stream ended early, still expected: {:?}", expected);
482 }
483 }
484 }
485
486 pub async fn assert_yields_only(mut self, expected: impl IntoIterator<Item = T>)
489 where
490 T: std::fmt::Debug + PartialEq,
491 {
492 self.assert_yields(expected).await;
493 self.assert_no_more().await;
494 }
495}
496
497impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
498 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
501 where
502 T: Ord,
503 {
504 let mut collected: C = self.0.collect().await;
505 collected.as_mut().sort();
506 collected
507 }
508
509 pub async fn assert_yields_unordered(&mut self, expected: impl IntoIterator<Item = T>)
512 where
513 T: std::fmt::Debug + PartialEq,
514 {
515 let mut expected: Vec<T> = expected.into_iter().collect();
516
517 while !expected.is_empty() {
518 if let Some(next) = self.0.next().await {
519 let idx = expected.iter().enumerate().find(|(_, e)| *e == &next);
520 if let Some((i, _)) = idx {
521 expected.swap_remove(i);
522 } else {
523 panic!("Stream yielded unexpected message: {:?}", next);
524 }
525 } else {
526 panic!("Stream ended early, still expected: {:?}", expected);
527 }
528 }
529 }
530
531 pub async fn assert_yields_only_unordered(mut self, expected: impl IntoIterator<Item = T>)
534 where
535 T: std::fmt::Debug + PartialEq,
536 {
537 self.assert_yields_unordered(expected).await;
538 self.assert_no_more().await;
539 }
540}
541
542impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
543 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
544{
545 type Output = SimReceiver<'a, T, O, R>;
546
547 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
548 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
549
550 assert!(ctx.remaining_ports.remove(looked_up));
551 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
552 ctx.output_ports.insert(*looked_up, sink);
553
554 SimReceiver(
555 Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
556 PhantomData,
557 )
558 }
559}
560
561pub struct SimSender<T, O: Ordering, R: Retries>(
563 Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
564 PhantomData<(O, R)>,
565);
566impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
567 pub fn send(&self, t: T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
570 (self.0)(t)
571 }
572
573 pub fn send_many<I: IntoIterator<Item = T>>(
576 &self,
577 iter: I,
578 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
579 for t in iter {
580 (self.0)(t)?;
581 }
582 Ok(())
583 }
584}
585
586impl<T> SimSender<T, NoOrder, ExactlyOnce> {
587 pub fn send_many_unordered<I: IntoIterator<Item = T>>(
590 &self,
591 iter: I,
592 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
593 for t in iter {
594 (self.0)(t)?;
595 }
596 Ok(())
597 }
598}
599
600impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
601 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
602{
603 type Output = SimSender<T, O, R>;
604
605 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
606 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
607
608 assert!(ctx.remaining_ports.remove(looked_up));
609 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
610 ctx.input_ports.insert(*looked_up, source);
611 SimSender(
612 Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
613 PhantomData,
614 )
615 }
616}
617
618enum LogKind<W: std::io::Write> {
619 Null,
620 Stderr,
621 Custom(W),
622}
623
624impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
626 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
627 match self {
628 LogKind::Null => Ok(()),
629 LogKind::Stderr => {
630 eprint!("{}", s);
631 Ok(())
632 }
633 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
634 }
635 }
636}
637
638type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
639
640struct LaunchedSim<W: std::io::Write> {
643 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
644 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
645 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
646 hooks: Hooks,
647 log: LogKind<W>,
648}
649
650impl<W: std::io::Write> LaunchedSim<W> {
651 async fn scheduler(&mut self) {
652 loop {
653 tokio::task::yield_now().await;
654 let mut any_made_progress = false;
655 for (loc, c_id, dfir) in &mut self.async_dfirs {
656 if dfir.run_tick().await {
657 any_made_progress = true;
658 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
659 .not_ready_ticks
660 .drain(..)
661 .partition(|(tick_loc, tick_c_id, _)| {
662 let LocationId::Tick(_, outer) = tick_loc else {
663 unreachable!()
664 };
665 outer.as_ref() == loc && tick_c_id == c_id
666 });
667
668 self.possibly_ready_ticks.extend(now_ready);
669 self.not_ready_ticks.extend(still_not_ready);
670 }
671 }
672
673 if any_made_progress {
674 continue;
675 } else {
676 use bolero::generator::*;
677
678 let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
679 .possibly_ready_ticks
680 .drain(..)
681 .partition(|(name, cid, _)| {
682 self.hooks
683 .get(&(name.clone(), *cid))
684 .unwrap()
685 .iter()
686 .any(|hook| {
687 hook.current_decision().unwrap_or(false)
688 || hook.can_make_nontrivial_decision()
689 })
690 });
691
692 self.possibly_ready_ticks = ready;
693 self.not_ready_ticks.append(&mut not_ready);
694
695 if self.possibly_ready_ticks.is_empty() {
696 break;
697 } else {
698 let next_tick = (0..self.possibly_ready_ticks.len()).any();
699 let mut removed = self.possibly_ready_ticks.remove(next_tick);
700
701 match &mut self.log {
702 LogKind::Null => {}
703 LogKind::Stderr => {
704 if let Some(cid) = &removed.1 {
705 eprintln!(
706 "\n{}",
707 format!("Running Tick (Cluster Member {})", cid)
708 .color(colored::Color::Magenta)
709 .bold()
710 )
711 } else {
712 eprintln!(
713 "\n{}",
714 "Running Tick".color(colored::Color::Magenta).bold()
715 )
716 }
717 }
718 LogKind::Custom(writer) => {
719 writeln!(
720 writer,
721 "\n{}",
722 "Running Tick".color(colored::Color::Magenta).bold()
723 )
724 .unwrap();
725 }
726 }
727
728 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
729 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
730 write.write_str(" ")
731 };
732
733 let mut tick_decision_writer =
734 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
735 inserter: &mut asterisk_indenter,
736 });
737
738 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
739 let mut remaining_decision_count = hooks.len();
740 let mut made_nontrivial_decision = false;
741
742 bolero_generator::any::scope::borrow_with(|driver| {
743 hooks.iter_mut().for_each(|hook| {
745 if let Some(is_nontrivial) = hook.current_decision() {
746 made_nontrivial_decision |= is_nontrivial;
747 remaining_decision_count -= 1;
748 } else if !hook.can_make_nontrivial_decision() {
749 hook.autonomous_decision(driver, false);
753 remaining_decision_count -= 1;
754 }
755 });
756
757 hooks.iter_mut().for_each(|hook| {
758 if hook.current_decision().is_none() {
759 made_nontrivial_decision |= hook.autonomous_decision(
760 driver,
761 !made_nontrivial_decision && remaining_decision_count == 1,
762 );
763 remaining_decision_count -= 1;
764 }
765
766 hook.release_decision(&mut tick_decision_writer);
767 });
768 });
769
770 assert!(removed.2.run_tick().await);
771 self.possibly_ready_ticks.push(removed);
772 }
773 }
774 }
775 }
776}