1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::BytesMut;
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::cluster::ClusterIds;
40use crate::location::dynamic::LocationId;
41use crate::location::external_process::{
42 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
43};
44use crate::nondet::NonDet;
45use crate::staging_util::get_this_crate;
46
47pub mod dynamic;
48
49#[expect(missing_docs, reason = "TODO")]
50pub mod external_process;
51pub use external_process::External;
52
53#[expect(missing_docs, reason = "TODO")]
54pub mod process;
55pub use process::Process;
56
57#[expect(missing_docs, reason = "TODO")]
58pub mod cluster;
59pub use cluster::Cluster;
60
61#[expect(missing_docs, reason = "TODO")]
62pub mod member_id;
63pub use member_id::MemberId;
64
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72 Joined,
73 Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79 Auto,
80 TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89 private_bounds,
90 reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93 type Root: Location<'a>;
94
95 fn root(&self) -> Self::Root;
96
97 fn try_tick(&self) -> Option<Tick<Self>> {
98 if Self::is_top_level() {
99 let next_id = self.flow_state().borrow_mut().next_clock_id;
100 self.flow_state().borrow_mut().next_clock_id += 1;
101 Some(Tick {
102 id: next_id,
103 l: self.clone(),
104 })
105 } else {
106 None
107 }
108 }
109
110 fn id(&self) -> LocationId {
111 dynamic::DynLocation::id(self)
112 }
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127 where
128 Self: Sized + NoTick,
129 {
130 Stream::new(
131 self.clone(),
132 HydroNode::Source {
133 source: HydroSource::Spin(),
134 metadata: self.new_node_metadata(Stream::<
135 (),
136 Self,
137 Unbounded,
138 TotalOrder,
139 ExactlyOnce,
140 >::collection_kind()),
141 },
142 )
143 }
144
145 fn source_stream<T, E>(
146 &self,
147 e: impl QuotedWithContext<'a, E, Self>,
148 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149 where
150 E: FuturesStream<Item = T> + Unpin,
151 Self: Sized + NoTick,
152 {
153 let e = e.splice_untyped_ctx(self);
154
155 Stream::new(
156 self.clone(),
157 HydroNode::Source {
158 source: HydroSource::Stream(e.into()),
159 metadata: self.new_node_metadata(Stream::<
160 T,
161 Self,
162 Unbounded,
163 TotalOrder,
164 ExactlyOnce,
165 >::collection_kind()),
166 },
167 )
168 }
169
170 fn source_iter<T, E>(
171 &self,
172 e: impl QuotedWithContext<'a, E, Self>,
173 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174 where
175 E: IntoIterator<Item = T>,
176 Self: Sized + NoTick,
177 {
178 let e = e.splice_typed_ctx(self);
181
182 Stream::new(
183 self.clone(),
184 HydroNode::Source {
185 source: HydroSource::Iter(e.into()),
186 metadata: self.new_node_metadata(Stream::<
187 T,
188 Self,
189 Unbounded,
190 TotalOrder,
191 ExactlyOnce,
192 >::collection_kind()),
193 },
194 )
195 }
196
197 fn source_cluster_members<C: 'a>(
198 &self,
199 cluster: &Cluster<'a, C>,
200 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201 where
202 Self: Sized + NoTick,
203 {
204 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
205 id: cluster.id,
206 _phantom: PhantomData,
207 };
208
209 self.source_iter(q!(underlying_memberids))
210 .map(q!(|id| (*id, MembershipEvent::Joined)))
211 .into_keyed()
212 }
213
214 fn source_external_bytes<L>(
215 &self,
216 from: &External<L>,
217 ) -> (
218 ExternalBytesPort,
219 Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
220 )
221 where
222 Self: Sized + NoTick,
223 {
224 let next_external_port_id = {
225 let mut flow_state = from.flow_state.borrow_mut();
226 let id = flow_state.next_external_out;
227 flow_state.next_external_out += 1;
228 id
229 };
230
231 (
232 ExternalBytesPort {
233 process_id: from.id,
234 port_id: next_external_port_id,
235 _phantom: Default::default(),
236 },
237 Stream::new(
238 self.clone(),
239 HydroNode::ExternalInput {
240 from_external_id: from.id,
241 from_key: next_external_port_id,
242 from_many: false,
243 codec_type: quote_type::<LengthDelimitedCodec>().into(),
244 port_hint: NetworkHint::Auto,
245 instantiate_fn: DebugInstantiate::Building,
246 deserialize_fn: None,
247 metadata: self.new_node_metadata(Stream::<
248 std::io::Result<BytesMut>,
249 Self,
250 Unbounded,
251 TotalOrder,
252 ExactlyOnce,
253 >::collection_kind()),
254 },
255 ),
256 )
257 }
258
259 #[expect(clippy::type_complexity, reason = "stream markers")]
260 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
261 &self,
262 from: &External<L>,
263 ) -> (
264 ExternalBincodeSink<T, NotMany, O, R>,
265 Stream<T, Self, Unbounded, O, R>,
266 )
267 where
268 Self: Sized + NoTick,
269 T: Serialize + DeserializeOwned,
270 {
271 let next_external_port_id = {
272 let mut flow_state = from.flow_state.borrow_mut();
273 let id = flow_state.next_external_out;
274 flow_state.next_external_out += 1;
275 id
276 };
277
278 (
279 ExternalBincodeSink {
280 process_id: from.id,
281 port_id: next_external_port_id,
282 _phantom: PhantomData,
283 },
284 Stream::new(
285 self.clone(),
286 HydroNode::ExternalInput {
287 from_external_id: from.id,
288 from_key: next_external_port_id,
289 from_many: false,
290 codec_type: quote_type::<LengthDelimitedCodec>().into(),
291 port_hint: NetworkHint::Auto,
292 instantiate_fn: DebugInstantiate::Building,
293 deserialize_fn: Some(
294 crate::live_collections::stream::networking::deserialize_bincode::<T>(None)
295 .into(),
296 ),
297 metadata: self
298 .new_node_metadata(Stream::<T, Self, Unbounded, O, R>::collection_kind()),
299 },
300 ),
301 )
302 }
303
304 #[expect(clippy::type_complexity, reason = "stream markers")]
305 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
306 &self,
307 from: &External<L>,
308 port_hint: NetworkHint,
309 ) -> (
310 ExternalBytesPort<Many>,
311 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
312 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
313 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
314 )
315 where
316 Self: Sized + NoTick,
317 {
318 let next_external_port_id = {
319 let mut flow_state = from.flow_state.borrow_mut();
320 let id = flow_state.next_external_out;
321 flow_state.next_external_out += 1;
322 id
323 };
324
325 let (fwd_ref, to_sink) =
326 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
327 let mut flow_state_borrow = self.flow_state().borrow_mut();
328
329 flow_state_borrow.push_root(HydroRoot::SendExternal {
330 to_external_id: from.id,
331 to_key: next_external_port_id,
332 to_many: true,
333 serialize_fn: None,
334 instantiate_fn: DebugInstantiate::Building,
335 input: Box::new(to_sink.entries().ir_node.into_inner()),
336 op_metadata: HydroIrOpMetadata::new(),
337 });
338
339 let raw_stream: Stream<
340 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
341 Self,
342 Unbounded,
343 TotalOrder,
344 ExactlyOnce,
345 > = Stream::new(
346 self.clone(),
347 HydroNode::ExternalInput {
348 from_external_id: from.id,
349 from_key: next_external_port_id,
350 from_many: true,
351 codec_type: quote_type::<Codec>().into(),
352 port_hint,
353 instantiate_fn: DebugInstantiate::Building,
354 deserialize_fn: None,
355 metadata: self.new_node_metadata(Stream::<
356 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
357 Self,
358 Unbounded,
359 TotalOrder,
360 ExactlyOnce,
361 >::collection_kind()),
362 },
363 );
364
365 let membership_stream_ident = syn::Ident::new(
366 &format!(
367 "__hydro_deploy_many_{}_{}_membership",
368 from.id, next_external_port_id
369 ),
370 Span::call_site(),
371 );
372 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
373 let raw_membership_stream: KeyedStream<
374 u64,
375 bool,
376 Self,
377 Unbounded,
378 TotalOrder,
379 ExactlyOnce,
380 > = KeyedStream::new(
381 self.clone(),
382 HydroNode::Source {
383 source: HydroSource::Stream(membership_stream_expr.into()),
384 metadata: self.new_node_metadata(KeyedStream::<
385 u64,
386 bool,
387 Self,
388 Unbounded,
389 TotalOrder,
390 ExactlyOnce,
391 >::collection_kind()),
392 },
393 );
394
395 (
396 ExternalBytesPort {
397 process_id: from.id,
398 port_id: next_external_port_id,
399 _phantom: PhantomData,
400 },
401 raw_stream
402 .flatten_ordered() .into_keyed(),
404 raw_membership_stream.map(q!(|join| {
405 if join {
406 MembershipEvent::Joined
407 } else {
408 MembershipEvent::Left
409 }
410 })),
411 fwd_ref,
412 )
413 }
414
415 #[expect(clippy::type_complexity, reason = "stream markers")]
416 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
417 &self,
418 from: &External<L>,
419 ) -> (
420 ExternalBincodeBidi<InT, OutT, Many>,
421 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
422 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
423 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
424 )
425 where
426 Self: Sized + NoTick,
427 {
428 let next_external_port_id = {
429 let mut flow_state = from.flow_state.borrow_mut();
430 let id = flow_state.next_external_out;
431 flow_state.next_external_out += 1;
432 id
433 };
434
435 let root = get_this_crate();
436
437 let (fwd_ref, to_sink) =
438 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
439 let mut flow_state_borrow = self.flow_state().borrow_mut();
440
441 let out_t_type = quote_type::<OutT>();
442 let ser_fn: syn::Expr = syn::parse_quote! {
443 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
444 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
445 )
446 };
447
448 flow_state_borrow.push_root(HydroRoot::SendExternal {
449 to_external_id: from.id,
450 to_key: next_external_port_id,
451 to_many: true,
452 serialize_fn: Some(ser_fn.into()),
453 instantiate_fn: DebugInstantiate::Building,
454 input: Box::new(to_sink.entries().ir_node.into_inner()),
455 op_metadata: HydroIrOpMetadata::new(),
456 });
457
458 let in_t_type = quote_type::<InT>();
459
460 let deser_fn: syn::Expr = syn::parse_quote! {
461 |res| {
462 let (id, b) = res.unwrap();
463 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
464 }
465 };
466
467 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
468 KeyedStream::new(
469 self.clone(),
470 HydroNode::ExternalInput {
471 from_external_id: from.id,
472 from_key: next_external_port_id,
473 from_many: true,
474 codec_type: quote_type::<LengthDelimitedCodec>().into(),
475 port_hint: NetworkHint::Auto,
476 instantiate_fn: DebugInstantiate::Building,
477 deserialize_fn: Some(deser_fn.into()),
478 metadata: self.new_node_metadata(KeyedStream::<
479 u64,
480 InT,
481 Self,
482 Unbounded,
483 TotalOrder,
484 ExactlyOnce,
485 >::collection_kind()),
486 },
487 );
488
489 let membership_stream_ident = syn::Ident::new(
490 &format!(
491 "__hydro_deploy_many_{}_{}_membership",
492 from.id, next_external_port_id
493 ),
494 Span::call_site(),
495 );
496 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
497 let raw_membership_stream: KeyedStream<
498 u64,
499 bool,
500 Self,
501 Unbounded,
502 TotalOrder,
503 ExactlyOnce,
504 > = KeyedStream::new(
505 self.clone(),
506 HydroNode::Source {
507 source: HydroSource::Stream(membership_stream_expr.into()),
508 metadata: self.new_node_metadata(KeyedStream::<
509 u64,
510 bool,
511 Self,
512 Unbounded,
513 TotalOrder,
514 ExactlyOnce,
515 >::collection_kind()),
516 },
517 );
518
519 (
520 ExternalBincodeBidi {
521 process_id: from.id,
522 port_id: next_external_port_id,
523 _phantom: PhantomData,
524 },
525 raw_stream,
526 raw_membership_stream.map(q!(|join| {
527 if join {
528 MembershipEvent::Joined
529 } else {
530 MembershipEvent::Left
531 }
532 })),
533 fwd_ref,
534 )
535 }
536
537 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
553 where
554 T: Clone,
555 Self: Sized,
556 {
557 let e = e.splice_untyped_ctx(self);
561
562 Singleton::new(
563 self.clone(),
564 HydroNode::SingletonSource {
565 value: e.into(),
566 metadata: self
567 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
568 },
569 )
570 }
571
572 fn source_interval(
582 &self,
583 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
584 _nondet: NonDet,
585 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
586 where
587 Self: Sized + NoTick,
588 {
589 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
590 tokio::time::interval(interval)
591 )))
592 }
593
594 fn source_interval_delayed(
605 &self,
606 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
607 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
608 _nondet: NonDet,
609 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
610 where
611 Self: Sized + NoTick,
612 {
613 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
614 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
615 )))
616 }
617
618 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
619 where
620 S: CycleCollection<'a, ForwardRef, Location = Self>,
621 Self: NoTick,
622 {
623 let next_id = self.flow_state().borrow_mut().next_cycle_id();
624 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
625
626 (
627 ForwardHandle {
628 completed: false,
629 ident: ident.clone(),
630 expected_location: Location::id(self),
631 _phantom: PhantomData,
632 },
633 S::create_source(ident, self.clone()),
634 )
635 }
636}
637
638#[cfg(feature = "deploy")]
639#[cfg(test)]
640mod tests {
641 use std::collections::HashSet;
642
643 use futures::{SinkExt, StreamExt};
644 use hydro_deploy::Deployment;
645 use stageleft::q;
646 use tokio_util::codec::LengthDelimitedCodec;
647
648 use crate::compile::builder::FlowBuilder;
649 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
650 use crate::location::{Location, NetworkHint};
651 use crate::nondet::nondet;
652
653 #[tokio::test]
654 async fn top_level_singleton_replay_cardinality() {
655 let mut deployment = Deployment::new();
656
657 let flow = FlowBuilder::new();
658 let node = flow.process::<()>();
659 let external = flow.external::<()>();
660
661 let (in_port, input) =
662 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
663 let singleton = node.singleton(q!(123));
664 let tick = node.tick();
665 let out = input
666 .batch(&tick, nondet!())
667 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
668 .cross_singleton(
669 singleton
670 .snapshot(&tick, nondet!())
671 .into_stream()
672 .count(),
673 )
674 .all_ticks()
675 .send_bincode_external(&external);
676
677 let nodes = flow
678 .with_process(&node, deployment.Localhost())
679 .with_external(&external, deployment.Localhost())
680 .deploy(&mut deployment);
681
682 deployment.deploy().await.unwrap();
683
684 let mut external_in = nodes.connect(in_port).await;
685 let mut external_out = nodes.connect(out).await;
686
687 deployment.start().await.unwrap();
688
689 external_in.send(1).await.unwrap();
690 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
691
692 external_in.send(2).await.unwrap();
693 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
694 }
695
696 #[tokio::test]
697 async fn tick_singleton_replay_cardinality() {
698 let mut deployment = Deployment::new();
699
700 let flow = FlowBuilder::new();
701 let node = flow.process::<()>();
702 let external = flow.external::<()>();
703
704 let (in_port, input) =
705 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
706 let tick = node.tick();
707 let singleton = tick.singleton(q!(123));
708 let out = input
709 .batch(&tick, nondet!())
710 .cross_singleton(singleton.clone())
711 .cross_singleton(singleton.into_stream().count())
712 .all_ticks()
713 .send_bincode_external(&external);
714
715 let nodes = flow
716 .with_process(&node, deployment.Localhost())
717 .with_external(&external, deployment.Localhost())
718 .deploy(&mut deployment);
719
720 deployment.deploy().await.unwrap();
721
722 let mut external_in = nodes.connect(in_port).await;
723 let mut external_out = nodes.connect(out).await;
724
725 deployment.start().await.unwrap();
726
727 external_in.send(1).await.unwrap();
728 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
729
730 external_in.send(2).await.unwrap();
731 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
732 }
733
734 #[tokio::test]
735 async fn external_bytes() {
736 let mut deployment = Deployment::new();
737
738 let flow = FlowBuilder::new();
739 let first_node = flow.process::<()>();
740 let external = flow.external::<()>();
741
742 let (in_port, input) = first_node.source_external_bytes(&external);
743 let out = input
744 .map(q!(|r| r.unwrap()))
745 .send_bincode_external(&external);
746
747 let nodes = flow
748 .with_process(&first_node, deployment.Localhost())
749 .with_external(&external, deployment.Localhost())
750 .deploy(&mut deployment);
751
752 deployment.deploy().await.unwrap();
753
754 let mut external_in = nodes.connect(in_port).await.1;
755 let mut external_out = nodes.connect(out).await;
756
757 deployment.start().await.unwrap();
758
759 external_in.send(vec![1, 2, 3].into()).await.unwrap();
760
761 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
762 }
763
764 #[tokio::test]
765 async fn multi_external_source() {
766 let mut deployment = Deployment::new();
767
768 let flow = FlowBuilder::new();
769 let first_node = flow.process::<()>();
770 let external = flow.external::<()>();
771
772 let (in_port, input, _membership, complete_sink) =
773 first_node.bidi_external_many_bincode(&external);
774 let out = input.entries().send_bincode_external(&external);
775 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
776
777 let nodes = flow
778 .with_process(&first_node, deployment.Localhost())
779 .with_external(&external, deployment.Localhost())
780 .deploy(&mut deployment);
781
782 deployment.deploy().await.unwrap();
783
784 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
785 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
786 let external_out = nodes.connect(out).await;
787
788 deployment.start().await.unwrap();
789
790 external_in_1.send(123).await.unwrap();
791 external_in_2.send(456).await.unwrap();
792
793 assert_eq!(
794 external_out.take(2).collect::<HashSet<_>>().await,
795 vec![(0, 123), (1, 456)].into_iter().collect()
796 );
797 }
798
799 #[tokio::test]
800 async fn second_connection_only_multi_source() {
801 let mut deployment = Deployment::new();
802
803 let flow = FlowBuilder::new();
804 let first_node = flow.process::<()>();
805 let external = flow.external::<()>();
806
807 let (in_port, input, _membership, complete_sink) =
808 first_node.bidi_external_many_bincode(&external);
809 let out = input.entries().send_bincode_external(&external);
810 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
811
812 let nodes = flow
813 .with_process(&first_node, deployment.Localhost())
814 .with_external(&external, deployment.Localhost())
815 .deploy(&mut deployment);
816
817 deployment.deploy().await.unwrap();
818
819 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
821 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
822 let mut external_out = nodes.connect(out).await;
823
824 deployment.start().await.unwrap();
825
826 external_in_2.send(456).await.unwrap();
827
828 assert_eq!(external_out.next().await.unwrap(), (1, 456));
829 }
830
831 #[tokio::test]
832 async fn multi_external_bytes() {
833 let mut deployment = Deployment::new();
834
835 let flow = FlowBuilder::new();
836 let first_node = flow.process::<()>();
837 let external = flow.external::<()>();
838
839 let (in_port, input, _membership, complete_sink) = first_node
840 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
841 let out = input.entries().send_bincode_external(&external);
842 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
843
844 let nodes = flow
845 .with_process(&first_node, deployment.Localhost())
846 .with_external(&external, deployment.Localhost())
847 .deploy(&mut deployment);
848
849 deployment.deploy().await.unwrap();
850
851 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
852 let mut external_in_2 = nodes.connect(in_port).await.1;
853 let external_out = nodes.connect(out).await;
854
855 deployment.start().await.unwrap();
856
857 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
858 external_in_2.send(vec![4, 5].into()).await.unwrap();
859
860 assert_eq!(
861 external_out.take(2).collect::<HashSet<_>>().await,
862 vec![
863 (0, (&[1u8, 2, 3] as &[u8]).into()),
864 (1, (&[4u8, 5] as &[u8]).into())
865 ]
866 .into_iter()
867 .collect()
868 );
869 }
870
871 #[tokio::test]
872 async fn echo_external_bytes() {
873 let mut deployment = Deployment::new();
874
875 let flow = FlowBuilder::new();
876 let first_node = flow.process::<()>();
877 let external = flow.external::<()>();
878
879 let (port, input, _membership, complete_sink) = first_node
880 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
881 complete_sink
882 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
883
884 let nodes = flow
885 .with_process(&first_node, deployment.Localhost())
886 .with_external(&external, deployment.Localhost())
887 .deploy(&mut deployment);
888
889 deployment.deploy().await.unwrap();
890
891 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
892 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
893
894 deployment.start().await.unwrap();
895
896 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
897 external_in_2.send(vec![4, 5].into()).await.unwrap();
898
899 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
900 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
901 }
902
903 #[tokio::test]
904 async fn echo_external_bincode() {
905 let mut deployment = Deployment::new();
906
907 let flow = FlowBuilder::new();
908 let first_node = flow.process::<()>();
909 let external = flow.external::<()>();
910
911 let (port, input, _membership, complete_sink) =
912 first_node.bidi_external_many_bincode(&external);
913 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
914
915 let nodes = flow
916 .with_process(&first_node, deployment.Localhost())
917 .with_external(&external, deployment.Localhost())
918 .deploy(&mut deployment);
919
920 deployment.deploy().await.unwrap();
921
922 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
923 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
924
925 deployment.start().await.unwrap();
926
927 external_in_1.send("hi".to_string()).await.unwrap();
928 external_in_2.send("hello".to_string()).await.unwrap();
929
930 assert_eq!(external_out_1.next().await.unwrap(), "HI");
931 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
932 }
933}