hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307}
308
309#[cfg(feature = "build")]
310/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
311/// and simulations.
312///
313/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
314/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
315pub trait DfirBuilder {
316    /// Whether the representation of singletons should include intermediate states.
317    fn singleton_intermediates(&self) -> bool;
318
319    /// Gets the DFIR builder for the given location, creating it if necessary.
320    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
321
322    fn batch(
323        &mut self,
324        in_ident: syn::Ident,
325        in_location: &LocationId,
326        in_kind: &CollectionKind,
327        out_ident: &syn::Ident,
328        out_location: &LocationId,
329        op_meta: &HydroIrOpMetadata,
330    );
331    fn yield_from_tick(
332        &mut self,
333        in_ident: syn::Ident,
334        in_location: &LocationId,
335        in_kind: &CollectionKind,
336        out_ident: &syn::Ident,
337        out_location: &LocationId,
338    );
339
340    fn begin_atomic(
341        &mut self,
342        in_ident: syn::Ident,
343        in_location: &LocationId,
344        in_kind: &CollectionKind,
345        out_ident: &syn::Ident,
346        out_location: &LocationId,
347        op_meta: &HydroIrOpMetadata,
348    );
349    fn end_atomic(
350        &mut self,
351        in_ident: syn::Ident,
352        in_location: &LocationId,
353        in_kind: &CollectionKind,
354        out_ident: &syn::Ident,
355    );
356
357    fn observe_nondet(
358        &mut self,
359        trusted: bool,
360        location: &LocationId,
361        in_ident: syn::Ident,
362        in_kind: &CollectionKind,
363        out_ident: &syn::Ident,
364        out_kind: &CollectionKind,
365    );
366
367    #[expect(clippy::too_many_arguments, reason = "TODO")]
368    fn create_network(
369        &mut self,
370        from: &LocationId,
371        to: &LocationId,
372        input_ident: syn::Ident,
373        out_ident: &syn::Ident,
374        serialize: &Option<DebugExpr>,
375        sink: syn::Expr,
376        source: syn::Expr,
377        deserialize: &Option<DebugExpr>,
378        tag_id: usize,
379    );
380
381    fn create_external_source(
382        &mut self,
383        on: &LocationId,
384        source_expr: syn::Expr,
385        out_ident: &syn::Ident,
386        deserialize: &Option<DebugExpr>,
387        tag_id: usize,
388    );
389
390    fn create_external_output(
391        &mut self,
392        on: &LocationId,
393        sink_expr: syn::Expr,
394        input_ident: &syn::Ident,
395        serialize: &Option<DebugExpr>,
396        tag_id: usize,
397    );
398}
399
400#[cfg(feature = "build")]
401impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
402    fn singleton_intermediates(&self) -> bool {
403        false
404    }
405
406    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
407        self.entry(location.root().raw_id()).or_default()
408    }
409
410    fn batch(
411        &mut self,
412        in_ident: syn::Ident,
413        in_location: &LocationId,
414        _in_kind: &CollectionKind,
415        out_ident: &syn::Ident,
416        _out_location: &LocationId,
417        _op_meta: &HydroIrOpMetadata,
418    ) {
419        let builder = self.get_dfir_mut(in_location.root());
420        builder.add_dfir(
421            parse_quote! {
422                #out_ident = #in_ident;
423            },
424            None,
425            None,
426        );
427    }
428
429    fn yield_from_tick(
430        &mut self,
431        in_ident: syn::Ident,
432        in_location: &LocationId,
433        _in_kind: &CollectionKind,
434        out_ident: &syn::Ident,
435        _out_location: &LocationId,
436    ) {
437        let builder = self.get_dfir_mut(in_location.root());
438        builder.add_dfir(
439            parse_quote! {
440                #out_ident = #in_ident;
441            },
442            None,
443            None,
444        );
445    }
446
447    fn begin_atomic(
448        &mut self,
449        in_ident: syn::Ident,
450        in_location: &LocationId,
451        _in_kind: &CollectionKind,
452        out_ident: &syn::Ident,
453        _out_location: &LocationId,
454        _op_meta: &HydroIrOpMetadata,
455    ) {
456        let builder = self.get_dfir_mut(in_location.root());
457        builder.add_dfir(
458            parse_quote! {
459                #out_ident = #in_ident;
460            },
461            None,
462            None,
463        );
464    }
465
466    fn end_atomic(
467        &mut self,
468        in_ident: syn::Ident,
469        in_location: &LocationId,
470        _in_kind: &CollectionKind,
471        out_ident: &syn::Ident,
472    ) {
473        let builder = self.get_dfir_mut(in_location.root());
474        builder.add_dfir(
475            parse_quote! {
476                #out_ident = #in_ident;
477            },
478            None,
479            None,
480        );
481    }
482
483    fn observe_nondet(
484        &mut self,
485        _trusted: bool,
486        location: &LocationId,
487        in_ident: syn::Ident,
488        _in_kind: &CollectionKind,
489        out_ident: &syn::Ident,
490        _out_kind: &CollectionKind,
491    ) {
492        let builder = self.get_dfir_mut(location);
493        builder.add_dfir(
494            parse_quote! {
495                #out_ident = #in_ident;
496            },
497            None,
498            None,
499        );
500    }
501
502    fn create_network(
503        &mut self,
504        from: &LocationId,
505        to: &LocationId,
506        input_ident: syn::Ident,
507        out_ident: &syn::Ident,
508        serialize: &Option<DebugExpr>,
509        sink: syn::Expr,
510        source: syn::Expr,
511        deserialize: &Option<DebugExpr>,
512        tag_id: usize,
513    ) {
514        let sender_builder = self.get_dfir_mut(from);
515        if let Some(serialize_pipeline) = serialize {
516            sender_builder.add_dfir(
517                parse_quote! {
518                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
519                },
520                None,
521                // operator tag separates send and receive, which otherwise have the same next_stmt_id
522                Some(&format!("send{}", tag_id)),
523            );
524        } else {
525            sender_builder.add_dfir(
526                parse_quote! {
527                    #input_ident -> dest_sink(#sink);
528                },
529                None,
530                Some(&format!("send{}", tag_id)),
531            );
532        }
533
534        let receiver_builder = self.get_dfir_mut(to);
535        if let Some(deserialize_pipeline) = deserialize {
536            receiver_builder.add_dfir(
537                parse_quote! {
538                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
539                },
540                None,
541                Some(&format!("recv{}", tag_id)),
542            );
543        } else {
544            receiver_builder.add_dfir(
545                parse_quote! {
546                    #out_ident = source_stream(#source);
547                },
548                None,
549                Some(&format!("recv{}", tag_id)),
550            );
551        }
552    }
553
554    fn create_external_source(
555        &mut self,
556        on: &LocationId,
557        source_expr: syn::Expr,
558        out_ident: &syn::Ident,
559        deserialize: &Option<DebugExpr>,
560        tag_id: usize,
561    ) {
562        let receiver_builder = self.get_dfir_mut(on);
563        if let Some(deserialize_pipeline) = deserialize {
564            receiver_builder.add_dfir(
565                parse_quote! {
566                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
567                },
568                None,
569                Some(&format!("recv{}", tag_id)),
570            );
571        } else {
572            receiver_builder.add_dfir(
573                parse_quote! {
574                    #out_ident = source_stream(#source_expr);
575                },
576                None,
577                Some(&format!("recv{}", tag_id)),
578            );
579        }
580    }
581
582    fn create_external_output(
583        &mut self,
584        on: &LocationId,
585        sink_expr: syn::Expr,
586        input_ident: &syn::Ident,
587        serialize: &Option<DebugExpr>,
588        tag_id: usize,
589    ) {
590        let sender_builder = self.get_dfir_mut(on);
591        if let Some(serialize_fn) = serialize {
592            sender_builder.add_dfir(
593                parse_quote! {
594                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
595                },
596                None,
597                // operator tag separates send and receive, which otherwise have the same next_stmt_id
598                Some(&format!("send{}", tag_id)),
599            );
600        } else {
601            sender_builder.add_dfir(
602                parse_quote! {
603                    #input_ident -> dest_sink(#sink_expr);
604                },
605                None,
606                Some(&format!("send{}", tag_id)),
607            );
608        }
609    }
610}
611
612#[cfg(feature = "build")]
613pub enum BuildersOrCallback<'a, L, N>
614where
615    L: FnMut(&mut HydroRoot, &mut usize),
616    N: FnMut(&mut HydroNode, &mut usize),
617{
618    Builders(&'a mut dyn DfirBuilder),
619    Callback(L, N),
620}
621
622/// An root in a Hydro graph, which is an pipeline that doesn't emit
623/// any downstream values. Traversals over the dataflow graph and
624/// generating DFIR IR start from roots.
625#[derive(Debug, Hash)]
626pub enum HydroRoot {
627    ForEach {
628        f: DebugExpr,
629        input: Box<HydroNode>,
630        op_metadata: HydroIrOpMetadata,
631    },
632    SendExternal {
633        to_external_id: usize,
634        to_key: usize,
635        to_many: bool,
636        serialize_fn: Option<DebugExpr>,
637        instantiate_fn: DebugInstantiate,
638        input: Box<HydroNode>,
639        op_metadata: HydroIrOpMetadata,
640    },
641    DestSink {
642        sink: DebugExpr,
643        input: Box<HydroNode>,
644        op_metadata: HydroIrOpMetadata,
645    },
646    CycleSink {
647        ident: syn::Ident,
648        input: Box<HydroNode>,
649        op_metadata: HydroIrOpMetadata,
650    },
651}
652
653impl HydroRoot {
654    #[cfg(feature = "build")]
655    pub fn compile_network<'a, D>(
656        &mut self,
657        compile_env: &D::CompileEnv,
658        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
659        seen_tees: &mut SeenTees,
660        processes: &HashMap<usize, D::Process>,
661        clusters: &HashMap<usize, D::Cluster>,
662        externals: &HashMap<usize, D::External>,
663    ) where
664        D: Deploy<'a>,
665    {
666        self.transform_bottom_up(
667            &mut |l| {
668                if let HydroRoot::SendExternal {
669                    input,
670                    to_external_id,
671                    to_key,
672                    to_many,
673                    instantiate_fn,
674                    ..
675                } = l
676                {
677                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
678                        DebugInstantiate::Building => {
679                            let to_node = externals
680                                .get(to_external_id)
681                                .unwrap_or_else(|| {
682                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
683                                })
684                                .clone();
685
686                            match input.metadata().location_kind.root() {
687                                LocationId::Process(process_id) => {
688                                    if *to_many {
689                                        (
690                                            (
691                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
692                                                parse_quote!(DUMMY),
693                                            ),
694                                            Box::new(|| {}) as Box<dyn FnOnce()>,
695                                        )
696                                    } else {
697                                        let from_node = processes
698                                            .get(process_id)
699                                            .unwrap_or_else(|| {
700                                                panic!("A process used in the graph was not instantiated: {}", process_id)
701                                            })
702                                            .clone();
703
704                                        let sink_port = D::allocate_process_port(&from_node);
705                                        let source_port = D::allocate_external_port(&to_node);
706
707                                        to_node.register(*to_key, source_port.clone());
708
709                                        (
710                                            (
711                                                D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
712                                                parse_quote!(DUMMY),
713                                            ),
714                                            D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
715                                        )
716                                    }
717                                }
718                                LocationId::Cluster(_) => todo!(),
719                                _ => panic!()
720                            }
721                        },
722
723                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
724                    };
725
726                    *instantiate_fn = DebugInstantiateFinalized {
727                        sink: sink_expr,
728                        source: source_expr,
729                        connect_fn: Some(connect_fn),
730                    }
731                    .into();
732                }
733            },
734            &mut |n| {
735                if let HydroNode::Network {
736                    input,
737                    instantiate_fn,
738                    metadata,
739                    ..
740                } = n
741                {
742                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
743                        DebugInstantiate::Building => instantiate_network::<D>(
744                            input.metadata().location_kind.root(),
745                            metadata.location_kind.root(),
746                            processes,
747                            clusters,
748                            compile_env,
749                        ),
750
751                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
752                    };
753
754                    *instantiate_fn = DebugInstantiateFinalized {
755                        sink: sink_expr,
756                        source: source_expr,
757                        connect_fn: Some(connect_fn),
758                    }
759                    .into();
760                } else if let HydroNode::ExternalInput {
761                    from_external_id,
762                    from_key,
763                    from_many,
764                    codec_type,
765                    port_hint,
766                    instantiate_fn,
767                    metadata,
768                    ..
769                } = n
770                {
771                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
772                        DebugInstantiate::Building => {
773                            let from_node = externals
774                                .get(from_external_id)
775                                .unwrap_or_else(|| {
776                                    panic!(
777                                        "A external used in the graph was not instantiated: {}",
778                                        from_external_id
779                                    )
780                                })
781                                .clone();
782
783                            match metadata.location_kind.root() {
784                                LocationId::Process(process_id) => {
785                                    let to_node = processes
786                                        .get(process_id)
787                                        .unwrap_or_else(|| {
788                                            panic!("A process used in the graph was not instantiated: {}", process_id)
789                                        })
790                                        .clone();
791
792                                    let sink_port = D::allocate_external_port(&from_node);
793                                    let source_port = D::allocate_process_port(&to_node);
794
795                                    from_node.register(*from_key, sink_port.clone());
796
797                                    (
798                                        (
799                                            parse_quote!(DUMMY),
800                                            if *from_many {
801                                                D::e2o_many_source(
802                                                    compile_env,
803                                                    extra_stmts.entry(*process_id).or_default(),
804                                                    &to_node, &source_port,
805                                                    codec_type.0.as_ref(),
806                                                    format!("{}_{}", *from_external_id, *from_key)
807                                                )
808                                            } else {
809                                                D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
810                                            },
811                                        ),
812                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
813                                    )
814                                }
815                                LocationId::Cluster(_) => todo!(),
816                                _ => panic!()
817                            }
818                        },
819
820                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
821                    };
822
823                    *instantiate_fn = DebugInstantiateFinalized {
824                        sink: sink_expr,
825                        source: source_expr,
826                        connect_fn: Some(connect_fn),
827                    }
828                    .into();
829                }
830            },
831            seen_tees,
832            false,
833        );
834    }
835
836    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
837        self.transform_bottom_up(
838            &mut |l| {
839                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
840                    match instantiate_fn {
841                        DebugInstantiate::Building => panic!("network not built"),
842
843                        DebugInstantiate::Finalized(finalized) => {
844                            (finalized.connect_fn.take().unwrap())();
845                        }
846                    }
847                }
848            },
849            &mut |n| {
850                if let HydroNode::Network { instantiate_fn, .. }
851                | HydroNode::ExternalInput { instantiate_fn, .. } = n
852                {
853                    match instantiate_fn {
854                        DebugInstantiate::Building => panic!("network not built"),
855
856                        DebugInstantiate::Finalized(finalized) => {
857                            (finalized.connect_fn.take().unwrap())();
858                        }
859                    }
860                }
861            },
862            seen_tees,
863            false,
864        );
865    }
866
867    pub fn transform_bottom_up(
868        &mut self,
869        transform_root: &mut impl FnMut(&mut HydroRoot),
870        transform_node: &mut impl FnMut(&mut HydroNode),
871        seen_tees: &mut SeenTees,
872        check_well_formed: bool,
873    ) {
874        self.transform_children(
875            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
876            seen_tees,
877        );
878
879        transform_root(self);
880    }
881
882    pub fn transform_children(
883        &mut self,
884        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
885        seen_tees: &mut SeenTees,
886    ) {
887        match self {
888            HydroRoot::ForEach { input, .. }
889            | HydroRoot::SendExternal { input, .. }
890            | HydroRoot::DestSink { input, .. }
891            | HydroRoot::CycleSink { input, .. } => {
892                transform(input, seen_tees);
893            }
894        }
895    }
896
897    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
898        match self {
899            HydroRoot::ForEach {
900                f,
901                input,
902                op_metadata,
903            } => HydroRoot::ForEach {
904                f: f.clone(),
905                input: Box::new(input.deep_clone(seen_tees)),
906                op_metadata: op_metadata.clone(),
907            },
908            HydroRoot::SendExternal {
909                to_external_id,
910                to_key,
911                to_many,
912                serialize_fn,
913                instantiate_fn,
914                input,
915                op_metadata,
916            } => HydroRoot::SendExternal {
917                to_external_id: *to_external_id,
918                to_key: *to_key,
919                to_many: *to_many,
920                serialize_fn: serialize_fn.clone(),
921                instantiate_fn: instantiate_fn.clone(),
922                input: Box::new(input.deep_clone(seen_tees)),
923                op_metadata: op_metadata.clone(),
924            },
925            HydroRoot::DestSink {
926                sink,
927                input,
928                op_metadata,
929            } => HydroRoot::DestSink {
930                sink: sink.clone(),
931                input: Box::new(input.deep_clone(seen_tees)),
932                op_metadata: op_metadata.clone(),
933            },
934            HydroRoot::CycleSink {
935                ident,
936                input,
937                op_metadata,
938            } => HydroRoot::CycleSink {
939                ident: ident.clone(),
940                input: Box::new(input.deep_clone(seen_tees)),
941                op_metadata: op_metadata.clone(),
942            },
943        }
944    }
945
946    #[cfg(feature = "build")]
947    pub fn emit(
948        &mut self,
949        graph_builders: &mut dyn DfirBuilder,
950        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
951        next_stmt_id: &mut usize,
952    ) {
953        self.emit_core(
954            &mut BuildersOrCallback::Builders::<
955                fn(&mut HydroRoot, &mut usize),
956                fn(&mut HydroNode, &mut usize),
957            >(graph_builders),
958            built_tees,
959            next_stmt_id,
960        );
961    }
962
963    #[cfg(feature = "build")]
964    pub fn emit_core(
965        &mut self,
966        builders_or_callback: &mut BuildersOrCallback<
967            impl FnMut(&mut HydroRoot, &mut usize),
968            impl FnMut(&mut HydroNode, &mut usize),
969        >,
970        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
971        next_stmt_id: &mut usize,
972    ) {
973        match self {
974            HydroRoot::ForEach { f, input, .. } => {
975                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
976
977                match builders_or_callback {
978                    BuildersOrCallback::Builders(graph_builders) => {
979                        graph_builders
980                            .get_dfir_mut(&input.metadata().location_kind)
981                            .add_dfir(
982                                parse_quote! {
983                                    #input_ident -> for_each(#f);
984                                },
985                                None,
986                                Some(&next_stmt_id.to_string()),
987                            );
988                    }
989                    BuildersOrCallback::Callback(leaf_callback, _) => {
990                        leaf_callback(self, next_stmt_id);
991                    }
992                }
993
994                *next_stmt_id += 1;
995            }
996
997            HydroRoot::SendExternal {
998                serialize_fn,
999                instantiate_fn,
1000                input,
1001                ..
1002            } => {
1003                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1004
1005                match builders_or_callback {
1006                    BuildersOrCallback::Builders(graph_builders) => {
1007                        let (sink_expr, _) = match instantiate_fn {
1008                            DebugInstantiate::Building => (
1009                                syn::parse_quote!(DUMMY_SINK),
1010                                syn::parse_quote!(DUMMY_SOURCE),
1011                            ),
1012
1013                            DebugInstantiate::Finalized(finalized) => {
1014                                (finalized.sink.clone(), finalized.source.clone())
1015                            }
1016                        };
1017
1018                        graph_builders.create_external_output(
1019                            &input.metadata().location_kind,
1020                            sink_expr,
1021                            &input_ident,
1022                            serialize_fn,
1023                            *next_stmt_id,
1024                        );
1025                    }
1026                    BuildersOrCallback::Callback(leaf_callback, _) => {
1027                        leaf_callback(self, next_stmt_id);
1028                    }
1029                }
1030
1031                *next_stmt_id += 1;
1032            }
1033
1034            HydroRoot::DestSink { sink, input, .. } => {
1035                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1036
1037                match builders_or_callback {
1038                    BuildersOrCallback::Builders(graph_builders) => {
1039                        graph_builders
1040                            .get_dfir_mut(&input.metadata().location_kind)
1041                            .add_dfir(
1042                                parse_quote! {
1043                                    #input_ident -> dest_sink(#sink);
1044                                },
1045                                None,
1046                                Some(&next_stmt_id.to_string()),
1047                            );
1048                    }
1049                    BuildersOrCallback::Callback(leaf_callback, _) => {
1050                        leaf_callback(self, next_stmt_id);
1051                    }
1052                }
1053
1054                *next_stmt_id += 1;
1055            }
1056
1057            HydroRoot::CycleSink { ident, input, .. } => {
1058                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1059
1060                match builders_or_callback {
1061                    BuildersOrCallback::Builders(graph_builders) => {
1062                        graph_builders
1063                            .get_dfir_mut(&input.metadata().location_kind)
1064                            .add_dfir(
1065                                parse_quote! {
1066                                    #ident = #input_ident;
1067                                },
1068                                None,
1069                                None,
1070                            );
1071                    }
1072                    // No ID, no callback
1073                    BuildersOrCallback::Callback(_, _) => {}
1074                }
1075            }
1076        }
1077    }
1078
1079    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1080        match self {
1081            HydroRoot::ForEach { op_metadata, .. }
1082            | HydroRoot::SendExternal { op_metadata, .. }
1083            | HydroRoot::DestSink { op_metadata, .. }
1084            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1085        }
1086    }
1087
1088    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1089        match self {
1090            HydroRoot::ForEach { op_metadata, .. }
1091            | HydroRoot::SendExternal { op_metadata, .. }
1092            | HydroRoot::DestSink { op_metadata, .. }
1093            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1094        }
1095    }
1096
1097    pub fn input(&self) -> &HydroNode {
1098        match self {
1099            HydroRoot::ForEach { input, .. }
1100            | HydroRoot::SendExternal { input, .. }
1101            | HydroRoot::DestSink { input, .. }
1102            | HydroRoot::CycleSink { input, .. } => input,
1103        }
1104    }
1105
1106    pub fn input_metadata(&self) -> &HydroIrMetadata {
1107        self.input().metadata()
1108    }
1109
1110    pub fn print_root(&self) -> String {
1111        match self {
1112            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1113            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1114            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1115            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1116        }
1117    }
1118
1119    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1120        match self {
1121            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1122                transform(f);
1123            }
1124            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1125        }
1126    }
1127}
1128
1129#[cfg(feature = "build")]
1130pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1131    let mut builders = BTreeMap::new();
1132    let mut built_tees = HashMap::new();
1133    let mut next_stmt_id = 0;
1134    for leaf in ir {
1135        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1136    }
1137    builders
1138}
1139
1140#[cfg(feature = "build")]
1141pub fn traverse_dfir(
1142    ir: &mut [HydroRoot],
1143    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1144    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1145) {
1146    let mut seen_tees = HashMap::new();
1147    let mut next_stmt_id = 0;
1148    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1149    ir.iter_mut().for_each(|leaf| {
1150        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1151    });
1152}
1153
1154pub fn transform_bottom_up(
1155    ir: &mut [HydroRoot],
1156    transform_root: &mut impl FnMut(&mut HydroRoot),
1157    transform_node: &mut impl FnMut(&mut HydroNode),
1158    check_well_formed: bool,
1159) {
1160    let mut seen_tees = HashMap::new();
1161    ir.iter_mut().for_each(|leaf| {
1162        leaf.transform_bottom_up(
1163            transform_root,
1164            transform_node,
1165            &mut seen_tees,
1166            check_well_formed,
1167        );
1168    });
1169}
1170
1171pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1172    let mut seen_tees = HashMap::new();
1173    ir.iter()
1174        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1175        .collect()
1176}
1177
1178type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1179thread_local! {
1180    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1181}
1182
1183pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1184    PRINTED_TEES.with(|printed_tees| {
1185        let mut printed_tees_mut = printed_tees.borrow_mut();
1186        *printed_tees_mut = Some((0, HashMap::new()));
1187        drop(printed_tees_mut);
1188
1189        let ret = f();
1190
1191        let mut printed_tees_mut = printed_tees.borrow_mut();
1192        *printed_tees_mut = None;
1193
1194        ret
1195    })
1196}
1197
1198pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1199
1200impl TeeNode {
1201    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1202        Rc::as_ptr(&self.0)
1203    }
1204}
1205
1206impl Debug for TeeNode {
1207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1208        PRINTED_TEES.with(|printed_tees| {
1209            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1210            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1211
1212            if let Some(printed_tees_mut) = printed_tees_mut {
1213                if let Some(existing) = printed_tees_mut
1214                    .1
1215                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1216                {
1217                    write!(f, "<tee {}>", existing)
1218                } else {
1219                    let next_id = printed_tees_mut.0;
1220                    printed_tees_mut.0 += 1;
1221                    printed_tees_mut
1222                        .1
1223                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1224                    drop(printed_tees_mut_borrow);
1225                    write!(f, "<tee {}>: ", next_id)?;
1226                    Debug::fmt(&self.0.borrow(), f)
1227                }
1228            } else {
1229                drop(printed_tees_mut_borrow);
1230                write!(f, "<tee>: ")?;
1231                Debug::fmt(&self.0.borrow(), f)
1232            }
1233        })
1234    }
1235}
1236
1237impl Hash for TeeNode {
1238    fn hash<H: Hasher>(&self, state: &mut H) {
1239        self.0.borrow_mut().hash(state);
1240    }
1241}
1242
1243#[derive(Clone, PartialEq, Eq, Debug)]
1244pub enum BoundKind {
1245    Unbounded,
1246    Bounded,
1247}
1248
1249#[derive(Clone, PartialEq, Eq, Debug)]
1250pub enum StreamOrder {
1251    NoOrder,
1252    TotalOrder,
1253}
1254
1255#[derive(Clone, PartialEq, Eq, Debug)]
1256pub enum StreamRetry {
1257    AtLeastOnce,
1258    ExactlyOnce,
1259}
1260
1261#[derive(Clone, PartialEq, Eq, Debug)]
1262pub enum KeyedSingletonBoundKind {
1263    Unbounded,
1264    BoundedValue,
1265    Bounded,
1266}
1267
1268#[derive(Clone, PartialEq, Eq, Debug)]
1269pub enum CollectionKind {
1270    Stream {
1271        bound: BoundKind,
1272        order: StreamOrder,
1273        retry: StreamRetry,
1274        element_type: DebugType,
1275    },
1276    Singleton {
1277        bound: BoundKind,
1278        element_type: DebugType,
1279    },
1280    Optional {
1281        bound: BoundKind,
1282        element_type: DebugType,
1283    },
1284    KeyedStream {
1285        bound: BoundKind,
1286        value_order: StreamOrder,
1287        value_retry: StreamRetry,
1288        key_type: DebugType,
1289        value_type: DebugType,
1290    },
1291    KeyedSingleton {
1292        bound: KeyedSingletonBoundKind,
1293        key_type: DebugType,
1294        value_type: DebugType,
1295    },
1296}
1297
1298#[derive(Clone)]
1299pub struct HydroIrMetadata {
1300    pub location_kind: LocationId,
1301    pub collection_kind: CollectionKind,
1302    pub cardinality: Option<usize>,
1303    pub tag: Option<String>,
1304    pub op: HydroIrOpMetadata,
1305}
1306
1307// HydroIrMetadata shouldn't be used to hash or compare
1308impl Hash for HydroIrMetadata {
1309    fn hash<H: Hasher>(&self, _: &mut H) {}
1310}
1311
1312impl PartialEq for HydroIrMetadata {
1313    fn eq(&self, _: &Self) -> bool {
1314        true
1315    }
1316}
1317
1318impl Eq for HydroIrMetadata {}
1319
1320impl Debug for HydroIrMetadata {
1321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1322        f.debug_struct("HydroIrMetadata")
1323            .field("location_kind", &self.location_kind)
1324            .field("collection_kind", &self.collection_kind)
1325            .finish()
1326    }
1327}
1328
1329/// Metadata that is specific to the operator itself, rather than its outputs.
1330/// This is available on _both_ inner nodes and roots.
1331#[derive(Clone)]
1332pub struct HydroIrOpMetadata {
1333    pub backtrace: Backtrace,
1334    pub cpu_usage: Option<f64>,
1335    pub network_recv_cpu_usage: Option<f64>,
1336    pub id: Option<usize>,
1337}
1338
1339impl HydroIrOpMetadata {
1340    #[expect(
1341        clippy::new_without_default,
1342        reason = "explicit calls to new ensure correct backtrace bounds"
1343    )]
1344    pub fn new() -> HydroIrOpMetadata {
1345        Self::new_with_skip(1)
1346    }
1347
1348    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1349        HydroIrOpMetadata {
1350            backtrace: Backtrace::get_backtrace(2 + skip_count),
1351            cpu_usage: None,
1352            network_recv_cpu_usage: None,
1353            id: None,
1354        }
1355    }
1356}
1357
1358impl Debug for HydroIrOpMetadata {
1359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1360        f.debug_struct("HydroIrOpMetadata").finish()
1361    }
1362}
1363
1364impl Hash for HydroIrOpMetadata {
1365    fn hash<H: Hasher>(&self, _: &mut H) {}
1366}
1367
1368/// An intermediate node in a Hydro graph, which consumes data
1369/// from upstream nodes and emits data to downstream nodes.
1370#[derive(Debug, Hash)]
1371pub enum HydroNode {
1372    Placeholder,
1373
1374    /// Manually "casts" between two different collection kinds.
1375    ///
1376    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1377    /// correctness checks. In particular, the user must ensure that every possible
1378    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1379    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1380    /// collection. This ensures that the simulator does not miss any possible outputs.
1381    Cast {
1382        inner: Box<HydroNode>,
1383        metadata: HydroIrMetadata,
1384    },
1385
1386    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1387    /// interpretation of the input stream.
1388    ///
1389    /// In production, this simply passes through the input, but in simulation, this operator
1390    /// explicitly selects a randomized interpretation.
1391    ObserveNonDet {
1392        inner: Box<HydroNode>,
1393        trusted: bool, // if true, we do not need to simulate non-determinism
1394        metadata: HydroIrMetadata,
1395    },
1396
1397    Source {
1398        source: HydroSource,
1399        metadata: HydroIrMetadata,
1400    },
1401
1402    SingletonSource {
1403        value: DebugExpr,
1404        metadata: HydroIrMetadata,
1405    },
1406
1407    CycleSource {
1408        ident: syn::Ident,
1409        metadata: HydroIrMetadata,
1410    },
1411
1412    Tee {
1413        inner: TeeNode,
1414        metadata: HydroIrMetadata,
1415    },
1416
1417    Persist {
1418        inner: Box<HydroNode>,
1419        metadata: HydroIrMetadata,
1420    },
1421
1422    BeginAtomic {
1423        inner: Box<HydroNode>,
1424        metadata: HydroIrMetadata,
1425    },
1426
1427    EndAtomic {
1428        inner: Box<HydroNode>,
1429        metadata: HydroIrMetadata,
1430    },
1431
1432    Batch {
1433        inner: Box<HydroNode>,
1434        metadata: HydroIrMetadata,
1435    },
1436
1437    YieldConcat {
1438        inner: Box<HydroNode>,
1439        metadata: HydroIrMetadata,
1440    },
1441
1442    Chain {
1443        first: Box<HydroNode>,
1444        second: Box<HydroNode>,
1445        metadata: HydroIrMetadata,
1446    },
1447
1448    ChainFirst {
1449        first: Box<HydroNode>,
1450        second: Box<HydroNode>,
1451        metadata: HydroIrMetadata,
1452    },
1453
1454    CrossProduct {
1455        left: Box<HydroNode>,
1456        right: Box<HydroNode>,
1457        metadata: HydroIrMetadata,
1458    },
1459
1460    CrossSingleton {
1461        left: Box<HydroNode>,
1462        right: Box<HydroNode>,
1463        metadata: HydroIrMetadata,
1464    },
1465
1466    Join {
1467        left: Box<HydroNode>,
1468        right: Box<HydroNode>,
1469        metadata: HydroIrMetadata,
1470    },
1471
1472    Difference {
1473        pos: Box<HydroNode>,
1474        neg: Box<HydroNode>,
1475        metadata: HydroIrMetadata,
1476    },
1477
1478    AntiJoin {
1479        pos: Box<HydroNode>,
1480        neg: Box<HydroNode>,
1481        metadata: HydroIrMetadata,
1482    },
1483
1484    ResolveFutures {
1485        input: Box<HydroNode>,
1486        metadata: HydroIrMetadata,
1487    },
1488    ResolveFuturesOrdered {
1489        input: Box<HydroNode>,
1490        metadata: HydroIrMetadata,
1491    },
1492
1493    Map {
1494        f: DebugExpr,
1495        input: Box<HydroNode>,
1496        metadata: HydroIrMetadata,
1497    },
1498    FlatMap {
1499        f: DebugExpr,
1500        input: Box<HydroNode>,
1501        metadata: HydroIrMetadata,
1502    },
1503    Filter {
1504        f: DebugExpr,
1505        input: Box<HydroNode>,
1506        metadata: HydroIrMetadata,
1507    },
1508    FilterMap {
1509        f: DebugExpr,
1510        input: Box<HydroNode>,
1511        metadata: HydroIrMetadata,
1512    },
1513
1514    DeferTick {
1515        input: Box<HydroNode>,
1516        metadata: HydroIrMetadata,
1517    },
1518    Enumerate {
1519        input: Box<HydroNode>,
1520        metadata: HydroIrMetadata,
1521    },
1522    Inspect {
1523        f: DebugExpr,
1524        input: Box<HydroNode>,
1525        metadata: HydroIrMetadata,
1526    },
1527
1528    Unique {
1529        input: Box<HydroNode>,
1530        metadata: HydroIrMetadata,
1531    },
1532
1533    Sort {
1534        input: Box<HydroNode>,
1535        metadata: HydroIrMetadata,
1536    },
1537    Fold {
1538        init: DebugExpr,
1539        acc: DebugExpr,
1540        input: Box<HydroNode>,
1541        metadata: HydroIrMetadata,
1542    },
1543
1544    Scan {
1545        init: DebugExpr,
1546        acc: DebugExpr,
1547        input: Box<HydroNode>,
1548        metadata: HydroIrMetadata,
1549    },
1550    FoldKeyed {
1551        init: DebugExpr,
1552        acc: DebugExpr,
1553        input: Box<HydroNode>,
1554        metadata: HydroIrMetadata,
1555    },
1556
1557    Reduce {
1558        f: DebugExpr,
1559        input: Box<HydroNode>,
1560        metadata: HydroIrMetadata,
1561    },
1562    ReduceKeyed {
1563        f: DebugExpr,
1564        input: Box<HydroNode>,
1565        metadata: HydroIrMetadata,
1566    },
1567    ReduceKeyedWatermark {
1568        f: DebugExpr,
1569        input: Box<HydroNode>,
1570        watermark: Box<HydroNode>,
1571        metadata: HydroIrMetadata,
1572    },
1573
1574    Network {
1575        serialize_fn: Option<DebugExpr>,
1576        instantiate_fn: DebugInstantiate,
1577        deserialize_fn: Option<DebugExpr>,
1578        input: Box<HydroNode>,
1579        metadata: HydroIrMetadata,
1580    },
1581
1582    ExternalInput {
1583        from_external_id: usize,
1584        from_key: usize,
1585        from_many: bool,
1586        codec_type: DebugType,
1587        port_hint: NetworkHint,
1588        instantiate_fn: DebugInstantiate,
1589        deserialize_fn: Option<DebugExpr>,
1590        metadata: HydroIrMetadata,
1591    },
1592
1593    Counter {
1594        tag: String,
1595        duration: DebugExpr,
1596        prefix: String,
1597        input: Box<HydroNode>,
1598        metadata: HydroIrMetadata,
1599    },
1600}
1601
1602pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1603pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1604
1605impl HydroNode {
1606    pub fn transform_bottom_up(
1607        &mut self,
1608        transform: &mut impl FnMut(&mut HydroNode),
1609        seen_tees: &mut SeenTees,
1610        check_well_formed: bool,
1611    ) {
1612        self.transform_children(
1613            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1614            seen_tees,
1615        );
1616
1617        transform(self);
1618
1619        let self_location = self.metadata().location_kind.root();
1620
1621        if check_well_formed {
1622            match &*self {
1623                HydroNode::Network { .. } => {}
1624                _ => {
1625                    self.input_metadata().iter().for_each(|i| {
1626                        if i.location_kind.root() != self_location {
1627                            panic!(
1628                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1629                                i,
1630                                i.location_kind.root(),
1631                                self,
1632                                self_location
1633                            )
1634                        }
1635                    });
1636                }
1637            }
1638        }
1639    }
1640
1641    #[inline(always)]
1642    pub fn transform_children(
1643        &mut self,
1644        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1645        seen_tees: &mut SeenTees,
1646    ) {
1647        match self {
1648            HydroNode::Placeholder => {
1649                panic!();
1650            }
1651
1652            HydroNode::Source { .. }
1653            | HydroNode::SingletonSource { .. }
1654            | HydroNode::CycleSource { .. }
1655            | HydroNode::ExternalInput { .. } => {}
1656
1657            HydroNode::Tee { inner, .. } => {
1658                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1659                    *inner = TeeNode(transformed.clone());
1660                } else {
1661                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1662                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1663                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1664                    transform(&mut orig, seen_tees);
1665                    *transformed_cell.borrow_mut() = orig;
1666                    *inner = TeeNode(transformed_cell);
1667                }
1668            }
1669
1670            HydroNode::Cast { inner, .. }
1671            | HydroNode::ObserveNonDet { inner, .. }
1672            | HydroNode::Persist { inner, .. }
1673            | HydroNode::BeginAtomic { inner, .. }
1674            | HydroNode::EndAtomic { inner, .. }
1675            | HydroNode::Batch { inner, .. }
1676            | HydroNode::YieldConcat { inner, .. } => {
1677                transform(inner.as_mut(), seen_tees);
1678            }
1679
1680            HydroNode::Chain { first, second, .. } => {
1681                transform(first.as_mut(), seen_tees);
1682                transform(second.as_mut(), seen_tees);
1683            }
1684
1685            HydroNode::ChainFirst { first, second, .. } => {
1686                transform(first.as_mut(), seen_tees);
1687                transform(second.as_mut(), seen_tees);
1688            }
1689
1690            HydroNode::CrossSingleton { left, right, .. }
1691            | HydroNode::CrossProduct { left, right, .. }
1692            | HydroNode::Join { left, right, .. } => {
1693                transform(left.as_mut(), seen_tees);
1694                transform(right.as_mut(), seen_tees);
1695            }
1696
1697            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1698                transform(pos.as_mut(), seen_tees);
1699                transform(neg.as_mut(), seen_tees);
1700            }
1701
1702            HydroNode::ReduceKeyedWatermark {
1703                input, watermark, ..
1704            } => {
1705                transform(input.as_mut(), seen_tees);
1706                transform(watermark.as_mut(), seen_tees);
1707            }
1708
1709            HydroNode::Map { input, .. }
1710            | HydroNode::ResolveFutures { input, .. }
1711            | HydroNode::ResolveFuturesOrdered { input, .. }
1712            | HydroNode::FlatMap { input, .. }
1713            | HydroNode::Filter { input, .. }
1714            | HydroNode::FilterMap { input, .. }
1715            | HydroNode::Sort { input, .. }
1716            | HydroNode::DeferTick { input, .. }
1717            | HydroNode::Enumerate { input, .. }
1718            | HydroNode::Inspect { input, .. }
1719            | HydroNode::Unique { input, .. }
1720            | HydroNode::Network { input, .. }
1721            | HydroNode::Fold { input, .. }
1722            | HydroNode::Scan { input, .. }
1723            | HydroNode::FoldKeyed { input, .. }
1724            | HydroNode::Reduce { input, .. }
1725            | HydroNode::ReduceKeyed { input, .. }
1726            | HydroNode::Counter { input, .. } => {
1727                transform(input.as_mut(), seen_tees);
1728            }
1729        }
1730    }
1731
1732    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1733        match self {
1734            HydroNode::Placeholder => HydroNode::Placeholder,
1735            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1736                inner: Box::new(inner.deep_clone(seen_tees)),
1737                metadata: metadata.clone(),
1738            },
1739            HydroNode::ObserveNonDet {
1740                inner,
1741                trusted,
1742                metadata,
1743            } => HydroNode::ObserveNonDet {
1744                inner: Box::new(inner.deep_clone(seen_tees)),
1745                trusted: *trusted,
1746                metadata: metadata.clone(),
1747            },
1748            HydroNode::Source { source, metadata } => HydroNode::Source {
1749                source: source.clone(),
1750                metadata: metadata.clone(),
1751            },
1752            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1753                value: value.clone(),
1754                metadata: metadata.clone(),
1755            },
1756            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1757                ident: ident.clone(),
1758                metadata: metadata.clone(),
1759            },
1760            HydroNode::Tee { inner, metadata } => {
1761                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1762                    HydroNode::Tee {
1763                        inner: TeeNode(transformed.clone()),
1764                        metadata: metadata.clone(),
1765                    }
1766                } else {
1767                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1768                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1769                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1770                    *new_rc.borrow_mut() = cloned;
1771                    HydroNode::Tee {
1772                        inner: TeeNode(new_rc),
1773                        metadata: metadata.clone(),
1774                    }
1775                }
1776            }
1777            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1778                inner: Box::new(inner.deep_clone(seen_tees)),
1779                metadata: metadata.clone(),
1780            },
1781            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1782                inner: Box::new(inner.deep_clone(seen_tees)),
1783                metadata: metadata.clone(),
1784            },
1785            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1786                inner: Box::new(inner.deep_clone(seen_tees)),
1787                metadata: metadata.clone(),
1788            },
1789            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1790                inner: Box::new(inner.deep_clone(seen_tees)),
1791                metadata: metadata.clone(),
1792            },
1793            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1794                inner: Box::new(inner.deep_clone(seen_tees)),
1795                metadata: metadata.clone(),
1796            },
1797            HydroNode::Chain {
1798                first,
1799                second,
1800                metadata,
1801            } => HydroNode::Chain {
1802                first: Box::new(first.deep_clone(seen_tees)),
1803                second: Box::new(second.deep_clone(seen_tees)),
1804                metadata: metadata.clone(),
1805            },
1806            HydroNode::ChainFirst {
1807                first,
1808                second,
1809                metadata,
1810            } => HydroNode::ChainFirst {
1811                first: Box::new(first.deep_clone(seen_tees)),
1812                second: Box::new(second.deep_clone(seen_tees)),
1813                metadata: metadata.clone(),
1814            },
1815            HydroNode::CrossProduct {
1816                left,
1817                right,
1818                metadata,
1819            } => HydroNode::CrossProduct {
1820                left: Box::new(left.deep_clone(seen_tees)),
1821                right: Box::new(right.deep_clone(seen_tees)),
1822                metadata: metadata.clone(),
1823            },
1824            HydroNode::CrossSingleton {
1825                left,
1826                right,
1827                metadata,
1828            } => HydroNode::CrossSingleton {
1829                left: Box::new(left.deep_clone(seen_tees)),
1830                right: Box::new(right.deep_clone(seen_tees)),
1831                metadata: metadata.clone(),
1832            },
1833            HydroNode::Join {
1834                left,
1835                right,
1836                metadata,
1837            } => HydroNode::Join {
1838                left: Box::new(left.deep_clone(seen_tees)),
1839                right: Box::new(right.deep_clone(seen_tees)),
1840                metadata: metadata.clone(),
1841            },
1842            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1843                pos: Box::new(pos.deep_clone(seen_tees)),
1844                neg: Box::new(neg.deep_clone(seen_tees)),
1845                metadata: metadata.clone(),
1846            },
1847            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1848                pos: Box::new(pos.deep_clone(seen_tees)),
1849                neg: Box::new(neg.deep_clone(seen_tees)),
1850                metadata: metadata.clone(),
1851            },
1852            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1853                input: Box::new(input.deep_clone(seen_tees)),
1854                metadata: metadata.clone(),
1855            },
1856            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1857                HydroNode::ResolveFuturesOrdered {
1858                    input: Box::new(input.deep_clone(seen_tees)),
1859                    metadata: metadata.clone(),
1860                }
1861            }
1862            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1863                f: f.clone(),
1864                input: Box::new(input.deep_clone(seen_tees)),
1865                metadata: metadata.clone(),
1866            },
1867            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1868                f: f.clone(),
1869                input: Box::new(input.deep_clone(seen_tees)),
1870                metadata: metadata.clone(),
1871            },
1872            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1873                f: f.clone(),
1874                input: Box::new(input.deep_clone(seen_tees)),
1875                metadata: metadata.clone(),
1876            },
1877            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1878                f: f.clone(),
1879                input: Box::new(input.deep_clone(seen_tees)),
1880                metadata: metadata.clone(),
1881            },
1882            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1883                input: Box::new(input.deep_clone(seen_tees)),
1884                metadata: metadata.clone(),
1885            },
1886            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1887                input: Box::new(input.deep_clone(seen_tees)),
1888                metadata: metadata.clone(),
1889            },
1890            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1891                f: f.clone(),
1892                input: Box::new(input.deep_clone(seen_tees)),
1893                metadata: metadata.clone(),
1894            },
1895            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1896                input: Box::new(input.deep_clone(seen_tees)),
1897                metadata: metadata.clone(),
1898            },
1899            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1900                input: Box::new(input.deep_clone(seen_tees)),
1901                metadata: metadata.clone(),
1902            },
1903            HydroNode::Fold {
1904                init,
1905                acc,
1906                input,
1907                metadata,
1908            } => HydroNode::Fold {
1909                init: init.clone(),
1910                acc: acc.clone(),
1911                input: Box::new(input.deep_clone(seen_tees)),
1912                metadata: metadata.clone(),
1913            },
1914            HydroNode::Scan {
1915                init,
1916                acc,
1917                input,
1918                metadata,
1919            } => HydroNode::Scan {
1920                init: init.clone(),
1921                acc: acc.clone(),
1922                input: Box::new(input.deep_clone(seen_tees)),
1923                metadata: metadata.clone(),
1924            },
1925            HydroNode::FoldKeyed {
1926                init,
1927                acc,
1928                input,
1929                metadata,
1930            } => HydroNode::FoldKeyed {
1931                init: init.clone(),
1932                acc: acc.clone(),
1933                input: Box::new(input.deep_clone(seen_tees)),
1934                metadata: metadata.clone(),
1935            },
1936            HydroNode::ReduceKeyedWatermark {
1937                f,
1938                input,
1939                watermark,
1940                metadata,
1941            } => HydroNode::ReduceKeyedWatermark {
1942                f: f.clone(),
1943                input: Box::new(input.deep_clone(seen_tees)),
1944                watermark: Box::new(watermark.deep_clone(seen_tees)),
1945                metadata: metadata.clone(),
1946            },
1947            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1948                f: f.clone(),
1949                input: Box::new(input.deep_clone(seen_tees)),
1950                metadata: metadata.clone(),
1951            },
1952            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1953                f: f.clone(),
1954                input: Box::new(input.deep_clone(seen_tees)),
1955                metadata: metadata.clone(),
1956            },
1957            HydroNode::Network {
1958                serialize_fn,
1959                instantiate_fn,
1960                deserialize_fn,
1961                input,
1962                metadata,
1963            } => HydroNode::Network {
1964                serialize_fn: serialize_fn.clone(),
1965                instantiate_fn: instantiate_fn.clone(),
1966                deserialize_fn: deserialize_fn.clone(),
1967                input: Box::new(input.deep_clone(seen_tees)),
1968                metadata: metadata.clone(),
1969            },
1970            HydroNode::ExternalInput {
1971                from_external_id,
1972                from_key,
1973                from_many,
1974                codec_type,
1975                port_hint,
1976                instantiate_fn,
1977                deserialize_fn,
1978                metadata,
1979            } => HydroNode::ExternalInput {
1980                from_external_id: *from_external_id,
1981                from_key: *from_key,
1982                from_many: *from_many,
1983                codec_type: codec_type.clone(),
1984                port_hint: *port_hint,
1985                instantiate_fn: instantiate_fn.clone(),
1986                deserialize_fn: deserialize_fn.clone(),
1987                metadata: metadata.clone(),
1988            },
1989            HydroNode::Counter {
1990                tag,
1991                duration,
1992                prefix,
1993                input,
1994                metadata,
1995            } => HydroNode::Counter {
1996                tag: tag.clone(),
1997                duration: duration.clone(),
1998                prefix: prefix.clone(),
1999                input: Box::new(input.deep_clone(seen_tees)),
2000                metadata: metadata.clone(),
2001            },
2002        }
2003    }
2004
2005    #[cfg(feature = "build")]
2006    pub fn emit_core(
2007        &mut self,
2008        builders_or_callback: &mut BuildersOrCallback<
2009            impl FnMut(&mut HydroRoot, &mut usize),
2010            impl FnMut(&mut HydroNode, &mut usize),
2011        >,
2012        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2013        next_stmt_id: &mut usize,
2014    ) -> syn::Ident {
2015        let out_location = self.metadata().location_kind.clone();
2016        match self {
2017            HydroNode::Placeholder => {
2018                panic!()
2019            }
2020
2021            HydroNode::Cast { inner, .. } => {
2022                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2023
2024                match builders_or_callback {
2025                    BuildersOrCallback::Builders(_) => {}
2026                    BuildersOrCallback::Callback(_, node_callback) => {
2027                        node_callback(self, next_stmt_id);
2028                    }
2029                }
2030
2031                *next_stmt_id += 1;
2032
2033                inner_ident
2034            }
2035
2036            HydroNode::ObserveNonDet {
2037                inner,
2038                trusted,
2039                metadata,
2040                ..
2041            } => {
2042                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2043
2044                let observe_ident =
2045                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2046
2047                match builders_or_callback {
2048                    BuildersOrCallback::Builders(graph_builders) => {
2049                        graph_builders.observe_nondet(
2050                            *trusted,
2051                            &inner.metadata().location_kind,
2052                            inner_ident,
2053                            &inner.metadata().collection_kind,
2054                            &observe_ident,
2055                            &metadata.collection_kind,
2056                        );
2057                    }
2058                    BuildersOrCallback::Callback(_, node_callback) => {
2059                        node_callback(self, next_stmt_id);
2060                    }
2061                }
2062
2063                *next_stmt_id += 1;
2064
2065                observe_ident
2066            }
2067
2068            HydroNode::Persist { inner, .. } => {
2069                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2070
2071                let persist_ident =
2072                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2073
2074                match builders_or_callback {
2075                    BuildersOrCallback::Builders(graph_builders) => {
2076                        let builder = graph_builders.get_dfir_mut(&out_location);
2077                        builder.add_dfir(
2078                            parse_quote! {
2079                                #persist_ident = #inner_ident -> persist::<'static>();
2080                            },
2081                            None,
2082                            Some(&next_stmt_id.to_string()),
2083                        );
2084                    }
2085                    BuildersOrCallback::Callback(_, node_callback) => {
2086                        node_callback(self, next_stmt_id);
2087                    }
2088                }
2089
2090                *next_stmt_id += 1;
2091
2092                persist_ident
2093            }
2094
2095            HydroNode::Batch {
2096                inner, metadata, ..
2097            } => {
2098                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2099
2100                let batch_ident =
2101                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2102
2103                match builders_or_callback {
2104                    BuildersOrCallback::Builders(graph_builders) => {
2105                        graph_builders.batch(
2106                            inner_ident,
2107                            &inner.metadata().location_kind,
2108                            &inner.metadata().collection_kind,
2109                            &batch_ident,
2110                            &out_location,
2111                            &metadata.op,
2112                        );
2113                    }
2114                    BuildersOrCallback::Callback(_, node_callback) => {
2115                        node_callback(self, next_stmt_id);
2116                    }
2117                }
2118
2119                *next_stmt_id += 1;
2120
2121                batch_ident
2122            }
2123
2124            HydroNode::YieldConcat { inner, .. } => {
2125                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2126
2127                let yield_ident =
2128                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2129
2130                match builders_or_callback {
2131                    BuildersOrCallback::Builders(graph_builders) => {
2132                        graph_builders.yield_from_tick(
2133                            inner_ident,
2134                            &inner.metadata().location_kind,
2135                            &inner.metadata().collection_kind,
2136                            &yield_ident,
2137                            &out_location,
2138                        );
2139                    }
2140                    BuildersOrCallback::Callback(_, node_callback) => {
2141                        node_callback(self, next_stmt_id);
2142                    }
2143                }
2144
2145                *next_stmt_id += 1;
2146
2147                yield_ident
2148            }
2149
2150            HydroNode::BeginAtomic { inner, metadata } => {
2151                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2152
2153                let begin_ident =
2154                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2155
2156                match builders_or_callback {
2157                    BuildersOrCallback::Builders(graph_builders) => {
2158                        graph_builders.begin_atomic(
2159                            inner_ident,
2160                            &inner.metadata().location_kind,
2161                            &inner.metadata().collection_kind,
2162                            &begin_ident,
2163                            &out_location,
2164                            &metadata.op,
2165                        );
2166                    }
2167                    BuildersOrCallback::Callback(_, node_callback) => {
2168                        node_callback(self, next_stmt_id);
2169                    }
2170                }
2171
2172                *next_stmt_id += 1;
2173
2174                begin_ident
2175            }
2176
2177            HydroNode::EndAtomic { inner, .. } => {
2178                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2179
2180                let end_ident =
2181                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2182
2183                match builders_or_callback {
2184                    BuildersOrCallback::Builders(graph_builders) => {
2185                        graph_builders.end_atomic(
2186                            inner_ident,
2187                            &inner.metadata().location_kind,
2188                            &inner.metadata().collection_kind,
2189                            &end_ident,
2190                        );
2191                    }
2192                    BuildersOrCallback::Callback(_, node_callback) => {
2193                        node_callback(self, next_stmt_id);
2194                    }
2195                }
2196
2197                *next_stmt_id += 1;
2198
2199                end_ident
2200            }
2201
2202            HydroNode::Source {
2203                source, metadata, ..
2204            } => {
2205                if let HydroSource::ExternalNetwork() = source {
2206                    syn::Ident::new("DUMMY", Span::call_site())
2207                } else {
2208                    let source_ident =
2209                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2210
2211                    let source_stmt = match source {
2212                        HydroSource::Stream(expr) => {
2213                            debug_assert!(metadata.location_kind.is_top_level());
2214                            parse_quote! {
2215                                #source_ident = source_stream(#expr);
2216                            }
2217                        }
2218
2219                        HydroSource::ExternalNetwork() => {
2220                            unreachable!()
2221                        }
2222
2223                        HydroSource::Iter(expr) => {
2224                            if metadata.location_kind.is_top_level() {
2225                                parse_quote! {
2226                                    #source_ident = source_iter(#expr);
2227                                }
2228                            } else {
2229                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2230                                parse_quote! {
2231                                    #source_ident = source_iter(#expr) -> persist::<'static>();
2232                                }
2233                            }
2234                        }
2235
2236                        HydroSource::Spin() => {
2237                            debug_assert!(metadata.location_kind.is_top_level());
2238                            parse_quote! {
2239                                #source_ident = spin();
2240                            }
2241                        }
2242                    };
2243
2244                    match builders_or_callback {
2245                        BuildersOrCallback::Builders(graph_builders) => {
2246                            let builder = graph_builders.get_dfir_mut(&out_location);
2247                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2248                        }
2249                        BuildersOrCallback::Callback(_, node_callback) => {
2250                            node_callback(self, next_stmt_id);
2251                        }
2252                    }
2253
2254                    *next_stmt_id += 1;
2255
2256                    source_ident
2257                }
2258            }
2259
2260            HydroNode::SingletonSource { value, metadata } => {
2261                let source_ident =
2262                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2263
2264                match builders_or_callback {
2265                    BuildersOrCallback::Builders(graph_builders) => {
2266                        let should_replay = !graph_builders.singleton_intermediates();
2267                        let builder = graph_builders.get_dfir_mut(&out_location);
2268
2269                        if should_replay || !metadata.location_kind.is_top_level() {
2270                            builder.add_dfir(
2271                                parse_quote! {
2272                                    #source_ident = source_iter([#value]) -> persist::<'static>();
2273                                },
2274                                None,
2275                                Some(&next_stmt_id.to_string()),
2276                            );
2277                        } else {
2278                            builder.add_dfir(
2279                                parse_quote! {
2280                                    #source_ident = source_iter([#value]);
2281                                },
2282                                None,
2283                                Some(&next_stmt_id.to_string()),
2284                            );
2285                        }
2286                    }
2287                    BuildersOrCallback::Callback(_, node_callback) => {
2288                        node_callback(self, next_stmt_id);
2289                    }
2290                }
2291
2292                *next_stmt_id += 1;
2293
2294                source_ident
2295            }
2296
2297            HydroNode::CycleSource { ident, .. } => {
2298                let ident = ident.clone();
2299
2300                match builders_or_callback {
2301                    BuildersOrCallback::Builders(_) => {}
2302                    BuildersOrCallback::Callback(_, node_callback) => {
2303                        node_callback(self, next_stmt_id);
2304                    }
2305                }
2306
2307                // consume a stmt id even though we did not emit anything so that we can instrument this
2308                *next_stmt_id += 1;
2309
2310                ident
2311            }
2312
2313            HydroNode::Tee { inner, .. } => {
2314                let ret_ident = if let Some(teed_from) =
2315                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2316                {
2317                    match builders_or_callback {
2318                        BuildersOrCallback::Builders(_) => {}
2319                        BuildersOrCallback::Callback(_, node_callback) => {
2320                            node_callback(self, next_stmt_id);
2321                        }
2322                    }
2323
2324                    teed_from.clone()
2325                } else {
2326                    let inner_ident = inner.0.borrow_mut().emit_core(
2327                        builders_or_callback,
2328                        built_tees,
2329                        next_stmt_id,
2330                    );
2331
2332                    let tee_ident =
2333                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335                    built_tees.insert(
2336                        inner.0.as_ref() as *const RefCell<HydroNode>,
2337                        tee_ident.clone(),
2338                    );
2339
2340                    match builders_or_callback {
2341                        BuildersOrCallback::Builders(graph_builders) => {
2342                            let builder = graph_builders.get_dfir_mut(&out_location);
2343                            builder.add_dfir(
2344                                parse_quote! {
2345                                    #tee_ident = #inner_ident -> tee();
2346                                },
2347                                None,
2348                                Some(&next_stmt_id.to_string()),
2349                            );
2350                        }
2351                        BuildersOrCallback::Callback(_, node_callback) => {
2352                            node_callback(self, next_stmt_id);
2353                        }
2354                    }
2355
2356                    tee_ident
2357                };
2358
2359                // we consume a stmt id regardless of if we emit the tee() operator,
2360                // so that during rewrites we touch all recipients of the tee()
2361
2362                *next_stmt_id += 1;
2363                ret_ident
2364            }
2365
2366            HydroNode::Chain { first, second, .. } => {
2367                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2368                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2369
2370                let chain_ident =
2371                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2372
2373                match builders_or_callback {
2374                    BuildersOrCallback::Builders(graph_builders) => {
2375                        let builder = graph_builders.get_dfir_mut(&out_location);
2376                        builder.add_dfir(
2377                            parse_quote! {
2378                                #chain_ident = chain();
2379                                #first_ident -> [0]#chain_ident;
2380                                #second_ident -> [1]#chain_ident;
2381                            },
2382                            None,
2383                            Some(&next_stmt_id.to_string()),
2384                        );
2385                    }
2386                    BuildersOrCallback::Callback(_, node_callback) => {
2387                        node_callback(self, next_stmt_id);
2388                    }
2389                }
2390
2391                *next_stmt_id += 1;
2392
2393                chain_ident
2394            }
2395
2396            HydroNode::ChainFirst { first, second, .. } => {
2397                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2398                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2399
2400                let chain_ident =
2401                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2402
2403                match builders_or_callback {
2404                    BuildersOrCallback::Builders(graph_builders) => {
2405                        let builder = graph_builders.get_dfir_mut(&out_location);
2406                        builder.add_dfir(
2407                            parse_quote! {
2408                                #chain_ident = chain_first_n(1);
2409                                #first_ident -> [0]#chain_ident;
2410                                #second_ident -> [1]#chain_ident;
2411                            },
2412                            None,
2413                            Some(&next_stmt_id.to_string()),
2414                        );
2415                    }
2416                    BuildersOrCallback::Callback(_, node_callback) => {
2417                        node_callback(self, next_stmt_id);
2418                    }
2419                }
2420
2421                *next_stmt_id += 1;
2422
2423                chain_ident
2424            }
2425
2426            HydroNode::CrossSingleton { left, right, .. } => {
2427                let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2428                let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2429
2430                let cross_ident =
2431                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2432
2433                match builders_or_callback {
2434                    BuildersOrCallback::Builders(graph_builders) => {
2435                        let builder = graph_builders.get_dfir_mut(&out_location);
2436                        builder.add_dfir(
2437                            parse_quote! {
2438                                #cross_ident = cross_singleton();
2439                                #left_ident -> [input]#cross_ident;
2440                                #right_ident -> [single]#cross_ident;
2441                            },
2442                            None,
2443                            Some(&next_stmt_id.to_string()),
2444                        );
2445                    }
2446                    BuildersOrCallback::Callback(_, node_callback) => {
2447                        node_callback(self, next_stmt_id);
2448                    }
2449                }
2450
2451                *next_stmt_id += 1;
2452
2453                cross_ident
2454            }
2455
2456            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2457                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2458                    parse_quote!(cross_join_multiset)
2459                } else {
2460                    parse_quote!(join_multiset)
2461                };
2462
2463                let (HydroNode::CrossProduct { left, right, .. }
2464                | HydroNode::Join { left, right, .. }) = self
2465                else {
2466                    unreachable!()
2467                };
2468
2469                let is_top_level = left.metadata().location_kind.is_top_level()
2470                    && right.metadata().location_kind.is_top_level();
2471                let (left_inner, left_lifetime) =
2472                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2473                        debug_assert!(!left.metadata().location_kind.is_top_level());
2474                        (left, quote!('static))
2475                    } else if left.metadata().location_kind.is_top_level() {
2476                        (left, quote!('static))
2477                    } else {
2478                        (left, quote!('tick))
2479                    };
2480
2481                let (right_inner, right_lifetime) =
2482                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2483                        debug_assert!(!right.metadata().location_kind.is_top_level());
2484                        (right, quote!('static))
2485                    } else if right.metadata().location_kind.is_top_level() {
2486                        (right, quote!('static))
2487                    } else {
2488                        (right, quote!('tick))
2489                    };
2490
2491                let left_ident =
2492                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2493                let right_ident =
2494                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2495
2496                let stream_ident =
2497                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2498
2499                match builders_or_callback {
2500                    BuildersOrCallback::Builders(graph_builders) => {
2501                        let builder = graph_builders.get_dfir_mut(&out_location);
2502                        builder.add_dfir(
2503                            if is_top_level {
2504                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2505                                // a multiset_delta() to negate the replay behavior
2506                                parse_quote! {
2507                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2508                                    #left_ident -> [0]#stream_ident;
2509                                    #right_ident -> [1]#stream_ident;
2510                                }
2511                            } else {
2512                                parse_quote! {
2513                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2514                                    #left_ident -> [0]#stream_ident;
2515                                    #right_ident -> [1]#stream_ident;
2516                                }
2517                            }
2518                            ,
2519                            None,
2520                            Some(&next_stmt_id.to_string()),
2521                        );
2522                    }
2523                    BuildersOrCallback::Callback(_, node_callback) => {
2524                        node_callback(self, next_stmt_id);
2525                    }
2526                }
2527
2528                *next_stmt_id += 1;
2529
2530                stream_ident
2531            }
2532
2533            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2534                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2535                    parse_quote!(difference_multiset)
2536                } else {
2537                    parse_quote!(anti_join_multiset)
2538                };
2539
2540                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2541                    self
2542                else {
2543                    unreachable!()
2544                };
2545
2546                let (neg, neg_lifetime) =
2547                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2548                        debug_assert!(!neg.metadata().location_kind.is_top_level());
2549                        (neg, quote!('static))
2550                    } else if neg.metadata().location_kind.is_top_level() {
2551                        (neg, quote!('static))
2552                    } else {
2553                        (neg, quote!('tick))
2554                    };
2555
2556                let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2557                let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2558
2559                let stream_ident =
2560                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2561
2562                match builders_or_callback {
2563                    BuildersOrCallback::Builders(graph_builders) => {
2564                        let builder = graph_builders.get_dfir_mut(&out_location);
2565                        builder.add_dfir(
2566                            parse_quote! {
2567                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2568                                #pos_ident -> [pos]#stream_ident;
2569                                #neg_ident -> [neg]#stream_ident;
2570                            },
2571                            None,
2572                            Some(&next_stmt_id.to_string()),
2573                        );
2574                    }
2575                    BuildersOrCallback::Callback(_, node_callback) => {
2576                        node_callback(self, next_stmt_id);
2577                    }
2578                }
2579
2580                *next_stmt_id += 1;
2581
2582                stream_ident
2583            }
2584
2585            HydroNode::ResolveFutures { input, .. } => {
2586                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2587
2588                let futures_ident =
2589                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2590
2591                match builders_or_callback {
2592                    BuildersOrCallback::Builders(graph_builders) => {
2593                        let builder = graph_builders.get_dfir_mut(&out_location);
2594                        builder.add_dfir(
2595                            parse_quote! {
2596                                #futures_ident = #input_ident -> resolve_futures();
2597                            },
2598                            None,
2599                            Some(&next_stmt_id.to_string()),
2600                        );
2601                    }
2602                    BuildersOrCallback::Callback(_, node_callback) => {
2603                        node_callback(self, next_stmt_id);
2604                    }
2605                }
2606
2607                *next_stmt_id += 1;
2608
2609                futures_ident
2610            }
2611
2612            HydroNode::ResolveFuturesOrdered { input, .. } => {
2613                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2614
2615                let futures_ident =
2616                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2617
2618                match builders_or_callback {
2619                    BuildersOrCallback::Builders(graph_builders) => {
2620                        let builder = graph_builders.get_dfir_mut(&out_location);
2621                        builder.add_dfir(
2622                            parse_quote! {
2623                                #futures_ident = #input_ident -> resolve_futures_ordered();
2624                            },
2625                            None,
2626                            Some(&next_stmt_id.to_string()),
2627                        );
2628                    }
2629                    BuildersOrCallback::Callback(_, node_callback) => {
2630                        node_callback(self, next_stmt_id);
2631                    }
2632                }
2633
2634                *next_stmt_id += 1;
2635
2636                futures_ident
2637            }
2638
2639            HydroNode::Map { f, input, .. } => {
2640                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2641
2642                let map_ident =
2643                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2644
2645                match builders_or_callback {
2646                    BuildersOrCallback::Builders(graph_builders) => {
2647                        let builder = graph_builders.get_dfir_mut(&out_location);
2648                        builder.add_dfir(
2649                            parse_quote! {
2650                                #map_ident = #input_ident -> map(#f);
2651                            },
2652                            None,
2653                            Some(&next_stmt_id.to_string()),
2654                        );
2655                    }
2656                    BuildersOrCallback::Callback(_, node_callback) => {
2657                        node_callback(self, next_stmt_id);
2658                    }
2659                }
2660
2661                *next_stmt_id += 1;
2662
2663                map_ident
2664            }
2665
2666            HydroNode::FlatMap { f, input, .. } => {
2667                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2668
2669                let flat_map_ident =
2670                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2671
2672                match builders_or_callback {
2673                    BuildersOrCallback::Builders(graph_builders) => {
2674                        let builder = graph_builders.get_dfir_mut(&out_location);
2675                        builder.add_dfir(
2676                            parse_quote! {
2677                                #flat_map_ident = #input_ident -> flat_map(#f);
2678                            },
2679                            None,
2680                            Some(&next_stmt_id.to_string()),
2681                        );
2682                    }
2683                    BuildersOrCallback::Callback(_, node_callback) => {
2684                        node_callback(self, next_stmt_id);
2685                    }
2686                }
2687
2688                *next_stmt_id += 1;
2689
2690                flat_map_ident
2691            }
2692
2693            HydroNode::Filter { f, input, .. } => {
2694                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2695
2696                let filter_ident =
2697                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2698
2699                match builders_or_callback {
2700                    BuildersOrCallback::Builders(graph_builders) => {
2701                        let builder = graph_builders.get_dfir_mut(&out_location);
2702                        builder.add_dfir(
2703                            parse_quote! {
2704                                #filter_ident = #input_ident -> filter(#f);
2705                            },
2706                            None,
2707                            Some(&next_stmt_id.to_string()),
2708                        );
2709                    }
2710                    BuildersOrCallback::Callback(_, node_callback) => {
2711                        node_callback(self, next_stmt_id);
2712                    }
2713                }
2714
2715                *next_stmt_id += 1;
2716
2717                filter_ident
2718            }
2719
2720            HydroNode::FilterMap { f, input, .. } => {
2721                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2722
2723                let filter_map_ident =
2724                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2725
2726                match builders_or_callback {
2727                    BuildersOrCallback::Builders(graph_builders) => {
2728                        let builder = graph_builders.get_dfir_mut(&out_location);
2729                        builder.add_dfir(
2730                            parse_quote! {
2731                                #filter_map_ident = #input_ident -> filter_map(#f);
2732                            },
2733                            None,
2734                            Some(&next_stmt_id.to_string()),
2735                        );
2736                    }
2737                    BuildersOrCallback::Callback(_, node_callback) => {
2738                        node_callback(self, next_stmt_id);
2739                    }
2740                }
2741
2742                *next_stmt_id += 1;
2743
2744                filter_map_ident
2745            }
2746
2747            HydroNode::Sort { input, .. } => {
2748                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2749
2750                let sort_ident =
2751                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2752
2753                match builders_or_callback {
2754                    BuildersOrCallback::Builders(graph_builders) => {
2755                        let builder = graph_builders.get_dfir_mut(&out_location);
2756                        builder.add_dfir(
2757                            parse_quote! {
2758                                #sort_ident = #input_ident -> sort();
2759                            },
2760                            None,
2761                            Some(&next_stmt_id.to_string()),
2762                        );
2763                    }
2764                    BuildersOrCallback::Callback(_, node_callback) => {
2765                        node_callback(self, next_stmt_id);
2766                    }
2767                }
2768
2769                *next_stmt_id += 1;
2770
2771                sort_ident
2772            }
2773
2774            HydroNode::DeferTick { input, .. } => {
2775                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2776
2777                let defer_tick_ident =
2778                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2779
2780                match builders_or_callback {
2781                    BuildersOrCallback::Builders(graph_builders) => {
2782                        let builder = graph_builders.get_dfir_mut(&out_location);
2783                        builder.add_dfir(
2784                            parse_quote! {
2785                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2786                            },
2787                            None,
2788                            Some(&next_stmt_id.to_string()),
2789                        );
2790                    }
2791                    BuildersOrCallback::Callback(_, node_callback) => {
2792                        node_callback(self, next_stmt_id);
2793                    }
2794                }
2795
2796                *next_stmt_id += 1;
2797
2798                defer_tick_ident
2799            }
2800
2801            HydroNode::Enumerate { input, .. } => {
2802                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2803
2804                let enumerate_ident =
2805                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2806
2807                match builders_or_callback {
2808                    BuildersOrCallback::Builders(graph_builders) => {
2809                        let builder = graph_builders.get_dfir_mut(&out_location);
2810                        let lifetime = if input.metadata().location_kind.is_top_level() {
2811                            quote!('static)
2812                        } else {
2813                            quote!('tick)
2814                        };
2815                        builder.add_dfir(
2816                            parse_quote! {
2817                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2818                            },
2819                            None,
2820                            Some(&next_stmt_id.to_string()),
2821                        );
2822                    }
2823                    BuildersOrCallback::Callback(_, node_callback) => {
2824                        node_callback(self, next_stmt_id);
2825                    }
2826                }
2827
2828                *next_stmt_id += 1;
2829
2830                enumerate_ident
2831            }
2832
2833            HydroNode::Inspect { f, input, .. } => {
2834                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2835
2836                let inspect_ident =
2837                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2838
2839                match builders_or_callback {
2840                    BuildersOrCallback::Builders(graph_builders) => {
2841                        let builder = graph_builders.get_dfir_mut(&out_location);
2842                        builder.add_dfir(
2843                            parse_quote! {
2844                                #inspect_ident = #input_ident -> inspect(#f);
2845                            },
2846                            None,
2847                            Some(&next_stmt_id.to_string()),
2848                        );
2849                    }
2850                    BuildersOrCallback::Callback(_, node_callback) => {
2851                        node_callback(self, next_stmt_id);
2852                    }
2853                }
2854
2855                *next_stmt_id += 1;
2856
2857                inspect_ident
2858            }
2859
2860            HydroNode::Unique { input, .. } => {
2861                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2862
2863                let unique_ident =
2864                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2865
2866                match builders_or_callback {
2867                    BuildersOrCallback::Builders(graph_builders) => {
2868                        let builder = graph_builders.get_dfir_mut(&out_location);
2869                        let lifetime = if input.metadata().location_kind.is_top_level() {
2870                            quote!('static)
2871                        } else {
2872                            quote!('tick)
2873                        };
2874
2875                        builder.add_dfir(
2876                            parse_quote! {
2877                                #unique_ident = #input_ident -> unique::<#lifetime>();
2878                            },
2879                            None,
2880                            Some(&next_stmt_id.to_string()),
2881                        );
2882                    }
2883                    BuildersOrCallback::Callback(_, node_callback) => {
2884                        node_callback(self, next_stmt_id);
2885                    }
2886                }
2887
2888                *next_stmt_id += 1;
2889
2890                unique_ident
2891            }
2892
2893            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2894                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2895                    parse_quote!(fold)
2896                } else if matches!(self, HydroNode::Scan { .. }) {
2897                    parse_quote!(scan)
2898                } else {
2899                    parse_quote!(fold_keyed)
2900                };
2901
2902                let (HydroNode::Fold { input, .. }
2903                | HydroNode::FoldKeyed { input, .. }
2904                | HydroNode::Scan { input, .. }) = self
2905                else {
2906                    unreachable!()
2907                };
2908
2909                let (input, lifetime) =
2910                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2911                        debug_assert!(!input.metadata().location_kind.is_top_level());
2912                        (input, quote!('static))
2913                    } else if input.metadata().location_kind.is_top_level() {
2914                        (input, quote!('static))
2915                    } else {
2916                        (input, quote!('tick))
2917                    };
2918
2919                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2920
2921                let (HydroNode::Fold { init, acc, .. }
2922                | HydroNode::FoldKeyed { init, acc, .. }
2923                | HydroNode::Scan { init, acc, .. }) = &*self
2924                else {
2925                    unreachable!()
2926                };
2927
2928                let fold_ident =
2929                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931                match builders_or_callback {
2932                    BuildersOrCallback::Builders(graph_builders) => {
2933                        if matches!(self, HydroNode::Fold { .. })
2934                            && self.metadata().location_kind.is_top_level()
2935                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2936                            && graph_builders.singleton_intermediates()
2937                        {
2938                            let builder = graph_builders.get_dfir_mut(&out_location);
2939
2940                            let acc: syn::Expr = parse_quote!({
2941                                let mut __inner = #acc;
2942                                move |__state, __value| {
2943                                    __inner(__state, __value);
2944                                    Some(__state.clone())
2945                                }
2946                            });
2947
2948                            builder.add_dfir(
2949                                parse_quote! {
2950                                    source_iter([(#init)()]) -> [0]#fold_ident;
2951                                    #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2952                                    #fold_ident = chain();
2953                                },
2954                                None,
2955                                Some(&next_stmt_id.to_string()),
2956                            );
2957                        } else if matches!(self, HydroNode::FoldKeyed { .. })
2958                            && self.metadata().location_kind.is_top_level()
2959                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2960                            && graph_builders.singleton_intermediates()
2961                        {
2962                            let builder = graph_builders.get_dfir_mut(&out_location);
2963
2964                            let acc: syn::Expr = parse_quote!({
2965                                let mut __init = #init;
2966                                let mut __inner = #acc;
2967                                move |__state, (__key, __value)| {
2968                                    // TODO(shadaj): we can avoid the clone when the entry exists
2969                                    let __state = __state.entry(__key.clone()).or_insert_with(|| (__init)());
2970                                    __inner(__state, __value);
2971                                    Some((__key, __state.clone()))
2972                                }
2973                            });
2974
2975                            builder.add_dfir(
2976                                parse_quote! {
2977                                    source_iter([(#init)()]) -> [0]#fold_ident;
2978                                    #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
2979                                },
2980                                None,
2981                                Some(&next_stmt_id.to_string()),
2982                            );
2983                        } else {
2984                            let builder = graph_builders.get_dfir_mut(&out_location);
2985                            builder.add_dfir(
2986                                parse_quote! {
2987                                    #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2988                                },
2989                                None,
2990                                Some(&next_stmt_id.to_string()),
2991                            );
2992                        }
2993                    }
2994                    BuildersOrCallback::Callback(_, node_callback) => {
2995                        node_callback(self, next_stmt_id);
2996                    }
2997                }
2998
2999                *next_stmt_id += 1;
3000
3001                fold_ident
3002            }
3003
3004            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3005                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3006                    parse_quote!(reduce)
3007                } else {
3008                    parse_quote!(reduce_keyed)
3009                };
3010
3011                let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3012                else {
3013                    unreachable!()
3014                };
3015
3016                let (input, lifetime) =
3017                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3018                        debug_assert!(!input.metadata().location_kind.is_top_level());
3019                        (input, quote!('static))
3020                    } else if input.metadata().location_kind.is_top_level() {
3021                        (input, quote!('static))
3022                    } else {
3023                        (input, quote!('tick))
3024                    };
3025
3026                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3027
3028                let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3029                else {
3030                    unreachable!()
3031                };
3032
3033                let reduce_ident =
3034                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3035
3036                match builders_or_callback {
3037                    BuildersOrCallback::Builders(graph_builders) => {
3038                        if matches!(self, HydroNode::Reduce { .. })
3039                            && self.metadata().location_kind.is_top_level()
3040                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3041                            && graph_builders.singleton_intermediates()
3042                        {
3043                            todo!(
3044                                "Reduce with optional intermediates is not yet supported in simulator"
3045                            );
3046                        } else if matches!(self, HydroNode::ReduceKeyed { .. })
3047                            && self.metadata().location_kind.is_top_level()
3048                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3049                            && graph_builders.singleton_intermediates()
3050                        {
3051                            todo!();
3052                        } else {
3053                            let builder = graph_builders.get_dfir_mut(&out_location);
3054                            builder.add_dfir(
3055                                parse_quote! {
3056                                    #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3057                                },
3058                                None,
3059                                Some(&next_stmt_id.to_string()),
3060                            );
3061                        }
3062                    }
3063                    BuildersOrCallback::Callback(_, node_callback) => {
3064                        node_callback(self, next_stmt_id);
3065                    }
3066                }
3067
3068                *next_stmt_id += 1;
3069
3070                reduce_ident
3071            }
3072
3073            HydroNode::ReduceKeyedWatermark {
3074                f,
3075                input,
3076                watermark,
3077                ..
3078            } => {
3079                let (input, lifetime) =
3080                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3081                        debug_assert!(!input.metadata().location_kind.is_top_level());
3082                        (input, quote!('static))
3083                    } else if input.metadata().location_kind.is_top_level() {
3084                        (input, quote!('static))
3085                    } else {
3086                        (input, quote!('tick))
3087                    };
3088
3089                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3090
3091                let watermark_ident =
3092                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
3093
3094                let chain_ident = syn::Ident::new(
3095                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3096                    Span::call_site(),
3097                );
3098
3099                let fold_ident =
3100                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3101
3102                match builders_or_callback {
3103                    BuildersOrCallback::Builders(graph_builders) => {
3104                        let builder = graph_builders.get_dfir_mut(&out_location);
3105                        // 1. Don't allow any values to be added to the map if the key <=the watermark
3106                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
3107                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
3108                        // 3. Convert the BTreeMap back into a stream of (k, v)
3109                        builder.add_dfir(
3110                            parse_quote! {
3111                                #chain_ident = chain();
3112                                #input_ident
3113                                    -> map(|x| (Some(x), None))
3114                                    -> [0]#chain_ident;
3115                                #watermark_ident
3116                                    -> map(|watermark| (None, Some(watermark)))
3117                                    -> [1]#chain_ident;
3118
3119                                #fold_ident = #chain_ident
3120                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3121                                        let __reduce_keyed_fn = #f;
3122                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3123                                            if let Some((k, v)) = opt_payload {
3124                                                if let Some(curr_watermark) = *opt_curr_watermark {
3125                                                    if k <= curr_watermark {
3126                                                        return;
3127                                                    }
3128                                                }
3129                                                match map.entry(k) {
3130                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
3131                                                        e.insert(v);
3132                                                    }
3133                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
3134                                                        __reduce_keyed_fn(e.get_mut(), v);
3135                                                    }
3136                                                }
3137                                            } else {
3138                                                let watermark = opt_watermark.unwrap();
3139                                                if let Some(curr_watermark) = *opt_curr_watermark {
3140                                                    if watermark <= curr_watermark {
3141                                                        return;
3142                                                    }
3143                                                }
3144                                                *opt_curr_watermark = opt_watermark;
3145                                                map.retain(|k, _| *k > watermark);
3146                                            }
3147                                        }
3148                                    })
3149                                    -> flat_map(|(map, _curr_watermark)| map);
3150                            },
3151                            None,
3152                            Some(&next_stmt_id.to_string()),
3153                        );
3154                    }
3155                    BuildersOrCallback::Callback(_, node_callback) => {
3156                        node_callback(self, next_stmt_id);
3157                    }
3158                }
3159
3160                *next_stmt_id += 1;
3161
3162                fold_ident
3163            }
3164
3165            HydroNode::Network {
3166                serialize_fn: serialize_pipeline,
3167                instantiate_fn,
3168                deserialize_fn: deserialize_pipeline,
3169                input,
3170                ..
3171            } => {
3172                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3173
3174                let receiver_stream_ident =
3175                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3176
3177                match builders_or_callback {
3178                    BuildersOrCallback::Builders(graph_builders) => {
3179                        let (sink_expr, source_expr) = match instantiate_fn {
3180                            DebugInstantiate::Building => (
3181                                syn::parse_quote!(DUMMY_SINK),
3182                                syn::parse_quote!(DUMMY_SOURCE),
3183                            ),
3184
3185                            DebugInstantiate::Finalized(finalized) => {
3186                                (finalized.sink.clone(), finalized.source.clone())
3187                            }
3188                        };
3189
3190                        graph_builders.create_network(
3191                            &input.metadata().location_kind,
3192                            &out_location,
3193                            input_ident,
3194                            &receiver_stream_ident,
3195                            serialize_pipeline,
3196                            sink_expr,
3197                            source_expr,
3198                            deserialize_pipeline,
3199                            *next_stmt_id,
3200                        );
3201                    }
3202                    BuildersOrCallback::Callback(_, node_callback) => {
3203                        node_callback(self, next_stmt_id);
3204                    }
3205                }
3206
3207                *next_stmt_id += 1;
3208
3209                receiver_stream_ident
3210            }
3211
3212            HydroNode::ExternalInput {
3213                instantiate_fn,
3214                deserialize_fn: deserialize_pipeline,
3215                ..
3216            } => {
3217                let receiver_stream_ident =
3218                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3219
3220                match builders_or_callback {
3221                    BuildersOrCallback::Builders(graph_builders) => {
3222                        let (_, source_expr) = match instantiate_fn {
3223                            DebugInstantiate::Building => (
3224                                syn::parse_quote!(DUMMY_SINK),
3225                                syn::parse_quote!(DUMMY_SOURCE),
3226                            ),
3227
3228                            DebugInstantiate::Finalized(finalized) => {
3229                                (finalized.sink.clone(), finalized.source.clone())
3230                            }
3231                        };
3232
3233                        graph_builders.create_external_source(
3234                            &out_location,
3235                            source_expr,
3236                            &receiver_stream_ident,
3237                            deserialize_pipeline,
3238                            *next_stmt_id,
3239                        );
3240                    }
3241                    BuildersOrCallback::Callback(_, node_callback) => {
3242                        node_callback(self, next_stmt_id);
3243                    }
3244                }
3245
3246                *next_stmt_id += 1;
3247
3248                receiver_stream_ident
3249            }
3250
3251            HydroNode::Counter {
3252                tag,
3253                duration,
3254                prefix,
3255                input,
3256                ..
3257            } => {
3258                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3259
3260                let counter_ident =
3261                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3262
3263                match builders_or_callback {
3264                    BuildersOrCallback::Builders(graph_builders) => {
3265                        let builder = graph_builders.get_dfir_mut(&out_location);
3266                        builder.add_dfir(
3267                            parse_quote! {
3268                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3269                            },
3270                            None,
3271                            Some(&next_stmt_id.to_string()),
3272                        );
3273                    }
3274                    BuildersOrCallback::Callback(_, node_callback) => {
3275                        node_callback(self, next_stmt_id);
3276                    }
3277                }
3278
3279                *next_stmt_id += 1;
3280
3281                counter_ident
3282            }
3283        }
3284    }
3285
3286    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3287        match self {
3288            HydroNode::Placeholder => {
3289                panic!()
3290            }
3291            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3292            HydroNode::Source { source, .. } => match source {
3293                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3294                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3295            },
3296            HydroNode::SingletonSource { value, .. } => {
3297                transform(value);
3298            }
3299            HydroNode::CycleSource { .. }
3300            | HydroNode::Tee { .. }
3301            | HydroNode::Persist { .. }
3302            | HydroNode::YieldConcat { .. }
3303            | HydroNode::BeginAtomic { .. }
3304            | HydroNode::EndAtomic { .. }
3305            | HydroNode::Batch { .. }
3306            | HydroNode::Chain { .. }
3307            | HydroNode::ChainFirst { .. }
3308            | HydroNode::CrossProduct { .. }
3309            | HydroNode::CrossSingleton { .. }
3310            | HydroNode::ResolveFutures { .. }
3311            | HydroNode::ResolveFuturesOrdered { .. }
3312            | HydroNode::Join { .. }
3313            | HydroNode::Difference { .. }
3314            | HydroNode::AntiJoin { .. }
3315            | HydroNode::DeferTick { .. }
3316            | HydroNode::Enumerate { .. }
3317            | HydroNode::Unique { .. }
3318            | HydroNode::Sort { .. } => {}
3319            HydroNode::Map { f, .. }
3320            | HydroNode::FlatMap { f, .. }
3321            | HydroNode::Filter { f, .. }
3322            | HydroNode::FilterMap { f, .. }
3323            | HydroNode::Inspect { f, .. }
3324            | HydroNode::Reduce { f, .. }
3325            | HydroNode::ReduceKeyed { f, .. }
3326            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3327                transform(f);
3328            }
3329            HydroNode::Fold { init, acc, .. }
3330            | HydroNode::Scan { init, acc, .. }
3331            | HydroNode::FoldKeyed { init, acc, .. } => {
3332                transform(init);
3333                transform(acc);
3334            }
3335            HydroNode::Network {
3336                serialize_fn,
3337                deserialize_fn,
3338                ..
3339            } => {
3340                if let Some(serialize_fn) = serialize_fn {
3341                    transform(serialize_fn);
3342                }
3343                if let Some(deserialize_fn) = deserialize_fn {
3344                    transform(deserialize_fn);
3345                }
3346            }
3347            HydroNode::ExternalInput { deserialize_fn, .. } => {
3348                if let Some(deserialize_fn) = deserialize_fn {
3349                    transform(deserialize_fn);
3350                }
3351            }
3352            HydroNode::Counter { duration, .. } => {
3353                transform(duration);
3354            }
3355        }
3356    }
3357
3358    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3359        &self.metadata().op
3360    }
3361
3362    pub fn metadata(&self) -> &HydroIrMetadata {
3363        match self {
3364            HydroNode::Placeholder => {
3365                panic!()
3366            }
3367            HydroNode::Cast { metadata, .. } => metadata,
3368            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3369            HydroNode::Source { metadata, .. } => metadata,
3370            HydroNode::SingletonSource { metadata, .. } => metadata,
3371            HydroNode::CycleSource { metadata, .. } => metadata,
3372            HydroNode::Tee { metadata, .. } => metadata,
3373            HydroNode::Persist { metadata, .. } => metadata,
3374            HydroNode::YieldConcat { metadata, .. } => metadata,
3375            HydroNode::BeginAtomic { metadata, .. } => metadata,
3376            HydroNode::EndAtomic { metadata, .. } => metadata,
3377            HydroNode::Batch { metadata, .. } => metadata,
3378            HydroNode::Chain { metadata, .. } => metadata,
3379            HydroNode::ChainFirst { metadata, .. } => metadata,
3380            HydroNode::CrossProduct { metadata, .. } => metadata,
3381            HydroNode::CrossSingleton { metadata, .. } => metadata,
3382            HydroNode::Join { metadata, .. } => metadata,
3383            HydroNode::Difference { metadata, .. } => metadata,
3384            HydroNode::AntiJoin { metadata, .. } => metadata,
3385            HydroNode::ResolveFutures { metadata, .. } => metadata,
3386            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3387            HydroNode::Map { metadata, .. } => metadata,
3388            HydroNode::FlatMap { metadata, .. } => metadata,
3389            HydroNode::Filter { metadata, .. } => metadata,
3390            HydroNode::FilterMap { metadata, .. } => metadata,
3391            HydroNode::DeferTick { metadata, .. } => metadata,
3392            HydroNode::Enumerate { metadata, .. } => metadata,
3393            HydroNode::Inspect { metadata, .. } => metadata,
3394            HydroNode::Unique { metadata, .. } => metadata,
3395            HydroNode::Sort { metadata, .. } => metadata,
3396            HydroNode::Scan { metadata, .. } => metadata,
3397            HydroNode::Fold { metadata, .. } => metadata,
3398            HydroNode::FoldKeyed { metadata, .. } => metadata,
3399            HydroNode::Reduce { metadata, .. } => metadata,
3400            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3401            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3402            HydroNode::ExternalInput { metadata, .. } => metadata,
3403            HydroNode::Network { metadata, .. } => metadata,
3404            HydroNode::Counter { metadata, .. } => metadata,
3405        }
3406    }
3407
3408    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3409        &mut self.metadata_mut().op
3410    }
3411
3412    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3413        match self {
3414            HydroNode::Placeholder => {
3415                panic!()
3416            }
3417            HydroNode::Cast { metadata, .. } => metadata,
3418            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3419            HydroNode::Source { metadata, .. } => metadata,
3420            HydroNode::SingletonSource { metadata, .. } => metadata,
3421            HydroNode::CycleSource { metadata, .. } => metadata,
3422            HydroNode::Tee { metadata, .. } => metadata,
3423            HydroNode::Persist { metadata, .. } => metadata,
3424            HydroNode::YieldConcat { metadata, .. } => metadata,
3425            HydroNode::BeginAtomic { metadata, .. } => metadata,
3426            HydroNode::EndAtomic { metadata, .. } => metadata,
3427            HydroNode::Batch { metadata, .. } => metadata,
3428            HydroNode::Chain { metadata, .. } => metadata,
3429            HydroNode::ChainFirst { metadata, .. } => metadata,
3430            HydroNode::CrossProduct { metadata, .. } => metadata,
3431            HydroNode::CrossSingleton { metadata, .. } => metadata,
3432            HydroNode::Join { metadata, .. } => metadata,
3433            HydroNode::Difference { metadata, .. } => metadata,
3434            HydroNode::AntiJoin { metadata, .. } => metadata,
3435            HydroNode::ResolveFutures { metadata, .. } => metadata,
3436            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3437            HydroNode::Map { metadata, .. } => metadata,
3438            HydroNode::FlatMap { metadata, .. } => metadata,
3439            HydroNode::Filter { metadata, .. } => metadata,
3440            HydroNode::FilterMap { metadata, .. } => metadata,
3441            HydroNode::DeferTick { metadata, .. } => metadata,
3442            HydroNode::Enumerate { metadata, .. } => metadata,
3443            HydroNode::Inspect { metadata, .. } => metadata,
3444            HydroNode::Unique { metadata, .. } => metadata,
3445            HydroNode::Sort { metadata, .. } => metadata,
3446            HydroNode::Scan { metadata, .. } => metadata,
3447            HydroNode::Fold { metadata, .. } => metadata,
3448            HydroNode::FoldKeyed { metadata, .. } => metadata,
3449            HydroNode::Reduce { metadata, .. } => metadata,
3450            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3451            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3452            HydroNode::ExternalInput { metadata, .. } => metadata,
3453            HydroNode::Network { metadata, .. } => metadata,
3454            HydroNode::Counter { metadata, .. } => metadata,
3455        }
3456    }
3457
3458    pub fn input(&self) -> Vec<&HydroNode> {
3459        match self {
3460            HydroNode::Placeholder => {
3461                panic!()
3462            }
3463            HydroNode::Source { .. }
3464            | HydroNode::SingletonSource { .. }
3465            | HydroNode::ExternalInput { .. }
3466            | HydroNode::CycleSource { .. }
3467            | HydroNode::Tee { .. } => {
3468                // Tee should find its input in separate special ways
3469                vec![]
3470            }
3471            HydroNode::Cast { inner, .. }
3472            | HydroNode::ObserveNonDet { inner, .. }
3473            | HydroNode::Persist { inner, .. }
3474            | HydroNode::YieldConcat { inner, .. }
3475            | HydroNode::BeginAtomic { inner, .. }
3476            | HydroNode::EndAtomic { inner, .. }
3477            | HydroNode::Batch { inner, .. } => {
3478                vec![inner]
3479            }
3480            HydroNode::Chain { first, second, .. } => {
3481                vec![first, second]
3482            }
3483            HydroNode::ChainFirst { first, second, .. } => {
3484                vec![first, second]
3485            }
3486            HydroNode::CrossProduct { left, right, .. }
3487            | HydroNode::CrossSingleton { left, right, .. }
3488            | HydroNode::Join { left, right, .. } => {
3489                vec![left, right]
3490            }
3491            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3492                vec![pos, neg]
3493            }
3494            HydroNode::Map { input, .. }
3495            | HydroNode::FlatMap { input, .. }
3496            | HydroNode::Filter { input, .. }
3497            | HydroNode::FilterMap { input, .. }
3498            | HydroNode::Sort { input, .. }
3499            | HydroNode::DeferTick { input, .. }
3500            | HydroNode::Enumerate { input, .. }
3501            | HydroNode::Inspect { input, .. }
3502            | HydroNode::Unique { input, .. }
3503            | HydroNode::Network { input, .. }
3504            | HydroNode::Counter { input, .. }
3505            | HydroNode::ResolveFutures { input, .. }
3506            | HydroNode::ResolveFuturesOrdered { input, .. } => {
3507                vec![input]
3508            }
3509            HydroNode::Fold { input, .. }
3510            | HydroNode::FoldKeyed { input, .. }
3511            | HydroNode::Reduce { input, .. }
3512            | HydroNode::ReduceKeyed { input, .. }
3513            | HydroNode::Scan { input, .. } => {
3514                // Skip persist before fold/reduce
3515                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3516                    vec![inner]
3517                } else {
3518                    vec![input]
3519                }
3520            }
3521            HydroNode::ReduceKeyedWatermark {
3522                input, watermark, ..
3523            } => {
3524                // Skip persist before fold/reduce
3525                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3526                    vec![inner, watermark]
3527                } else {
3528                    vec![input, watermark]
3529                }
3530            }
3531        }
3532    }
3533
3534    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3535        self.input()
3536            .iter()
3537            .map(|input_node| input_node.metadata())
3538            .collect()
3539    }
3540
3541    pub fn print_root(&self) -> String {
3542        match self {
3543            HydroNode::Placeholder => {
3544                panic!()
3545            }
3546            HydroNode::Cast { .. } => "Cast()".to_string(),
3547            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3548            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3549            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3550            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3551            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3552            HydroNode::Persist { .. } => "Persist()".to_string(),
3553            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3554            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3555            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3556            HydroNode::Batch { .. } => "Batch()".to_string(),
3557            HydroNode::Chain { first, second, .. } => {
3558                format!("Chain({}, {})", first.print_root(), second.print_root())
3559            }
3560            HydroNode::ChainFirst { first, second, .. } => {
3561                format!(
3562                    "ChainFirst({}, {})",
3563                    first.print_root(),
3564                    second.print_root()
3565                )
3566            }
3567            HydroNode::CrossProduct { left, right, .. } => {
3568                format!(
3569                    "CrossProduct({}, {})",
3570                    left.print_root(),
3571                    right.print_root()
3572                )
3573            }
3574            HydroNode::CrossSingleton { left, right, .. } => {
3575                format!(
3576                    "CrossSingleton({}, {})",
3577                    left.print_root(),
3578                    right.print_root()
3579                )
3580            }
3581            HydroNode::Join { left, right, .. } => {
3582                format!("Join({}, {})", left.print_root(), right.print_root())
3583            }
3584            HydroNode::Difference { pos, neg, .. } => {
3585                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3586            }
3587            HydroNode::AntiJoin { pos, neg, .. } => {
3588                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3589            }
3590            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3591            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3592            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3593            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3594            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3595            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3596            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3597            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3598            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3599            HydroNode::Unique { .. } => "Unique()".to_string(),
3600            HydroNode::Sort { .. } => "Sort()".to_string(),
3601            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3602            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3603            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3604            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3605            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3606            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3607            HydroNode::Network { .. } => "Network()".to_string(),
3608            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3609            HydroNode::Counter { tag, duration, .. } => {
3610                format!("Counter({:?}, {:?})", tag, duration)
3611            }
3612        }
3613    }
3614}
3615
3616#[cfg(feature = "build")]
3617fn instantiate_network<'a, D>(
3618    from_location: &LocationId,
3619    to_location: &LocationId,
3620    processes: &HashMap<usize, D::Process>,
3621    clusters: &HashMap<usize, D::Cluster>,
3622    compile_env: &D::CompileEnv,
3623) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3624where
3625    D: Deploy<'a>,
3626{
3627    let ((sink, source), connect_fn) = match (from_location, to_location) {
3628        (LocationId::Process(from), LocationId::Process(to)) => {
3629            let from_node = processes
3630                .get(from)
3631                .unwrap_or_else(|| {
3632                    panic!("A process used in the graph was not instantiated: {}", from)
3633                })
3634                .clone();
3635            let to_node = processes
3636                .get(to)
3637                .unwrap_or_else(|| {
3638                    panic!("A process used in the graph was not instantiated: {}", to)
3639                })
3640                .clone();
3641
3642            let sink_port = D::allocate_process_port(&from_node);
3643            let source_port = D::allocate_process_port(&to_node);
3644
3645            (
3646                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3647                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3648            )
3649        }
3650        (LocationId::Process(from), LocationId::Cluster(to)) => {
3651            let from_node = processes
3652                .get(from)
3653                .unwrap_or_else(|| {
3654                    panic!("A process used in the graph was not instantiated: {}", from)
3655                })
3656                .clone();
3657            let to_node = clusters
3658                .get(to)
3659                .unwrap_or_else(|| {
3660                    panic!("A cluster used in the graph was not instantiated: {}", to)
3661                })
3662                .clone();
3663
3664            let sink_port = D::allocate_process_port(&from_node);
3665            let source_port = D::allocate_cluster_port(&to_node);
3666
3667            (
3668                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3669                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3670            )
3671        }
3672        (LocationId::Cluster(from), LocationId::Process(to)) => {
3673            let from_node = clusters
3674                .get(from)
3675                .unwrap_or_else(|| {
3676                    panic!("A cluster used in the graph was not instantiated: {}", from)
3677                })
3678                .clone();
3679            let to_node = processes
3680                .get(to)
3681                .unwrap_or_else(|| {
3682                    panic!("A process used in the graph was not instantiated: {}", to)
3683                })
3684                .clone();
3685
3686            let sink_port = D::allocate_cluster_port(&from_node);
3687            let source_port = D::allocate_process_port(&to_node);
3688
3689            (
3690                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3691                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3692            )
3693        }
3694        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3695            let from_node = clusters
3696                .get(from)
3697                .unwrap_or_else(|| {
3698                    panic!("A cluster used in the graph was not instantiated: {}", from)
3699                })
3700                .clone();
3701            let to_node = clusters
3702                .get(to)
3703                .unwrap_or_else(|| {
3704                    panic!("A cluster used in the graph was not instantiated: {}", to)
3705                })
3706                .clone();
3707
3708            let sink_port = D::allocate_cluster_port(&from_node);
3709            let source_port = D::allocate_cluster_port(&to_node);
3710
3711            (
3712                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3713                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3714            )
3715        }
3716        (LocationId::Tick(_, _), _) => panic!(),
3717        (_, LocationId::Tick(_, _)) => panic!(),
3718        (LocationId::Atomic(_), _) => panic!(),
3719        (_, LocationId::Atomic(_)) => panic!(),
3720    };
3721    (sink, source, connect_fn)
3722}
3723
3724#[cfg(test)]
3725mod test {
3726    use std::mem::size_of;
3727
3728    use stageleft::{QuotedWithContext, q};
3729
3730    use super::*;
3731
3732    #[test]
3733    fn hydro_node_size() {
3734        assert_eq!(size_of::<HydroNode>(), 264);
3735    }
3736
3737    #[test]
3738    fn hydro_root_size() {
3739        assert_eq!(size_of::<HydroRoot>(), 160);
3740    }
3741
3742    #[test]
3743    fn test_simplify_q_macro_basic() {
3744        // Test basic non-q! expression
3745        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3746        let result = simplify_q_macro(simple_expr.clone());
3747        assert_eq!(result, simple_expr);
3748    }
3749
3750    #[test]
3751    fn test_simplify_q_macro_actual_stageleft_call() {
3752        // Test a simplified version of what a real stageleft call might look like
3753        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3754        let result = simplify_q_macro(stageleft_call);
3755        // This should be processed by our visitor and simplified to q!(...)
3756        // since we detect the stageleft::runtime_support::fn_* pattern
3757        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3758    }
3759
3760    #[test]
3761    fn test_closure_no_pipe_at_start() {
3762        // Test a closure that does not start with a pipe
3763        let stageleft_call = q!({
3764            let foo = 123;
3765            move |b: usize| b + foo
3766        })
3767        .splice_fn1_ctx(&());
3768        let result = simplify_q_macro(stageleft_call);
3769        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3770    }
3771}