hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
27 membership.fold(
28 q!(|| false),
29 q!(|present, event| {
30 match event {
31 MembershipEvent::Joined => *present = true,
32 MembershipEvent::Left => *present = false,
33 }
34 }),
35 )
36}
37
38fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39 let root = get_this_crate();
40
41 if is_demux {
42 parse_quote! {
43 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
44 |(id, data)| {
45 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46 }
47 )
48 }
49 } else {
50 parse_quote! {
51 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52 |data| {
53 #root::runtime_support::bincode::serialize(&data).unwrap().into()
54 }
55 )
56 }
57 }
58}
59
60pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61 serialize_bincode_with_type(is_demux, "e_type::<T>())
62}
63
64fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65 let root = get_this_crate();
66
67 if let Some(c_type) = tagged {
68 parse_quote! {
69 |res| {
70 let (id, b) = res.unwrap();
71 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72 }
73 }
74 } else {
75 parse_quote! {
76 |res| {
77 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78 }
79 }
80 }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84 deserialize_bincode_with_type(tagged, "e_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
88 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
89 /// using [`bincode`] to serialize/deserialize messages.
90 ///
91 /// The returned stream captures the elements received at the destination, where values will
92 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
93 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
94 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
95 /// dropped no further messages will be sent.
96 ///
97 /// # Example
98 /// ```rust
99 /// # use hydro_lang::prelude::*;
100 /// # use futures::StreamExt;
101 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
102 /// let p1 = flow.process::<()>();
103 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
104 /// let p2 = flow.process::<()>();
105 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
106 /// // 1, 2, 3
107 /// # on_p2.send_bincode(&p_out)
108 /// # }, |mut stream| async move {
109 /// # for w in 1..=3 {
110 /// # assert_eq!(stream.next().await, Some(w));
111 /// # }
112 /// # }));
113 /// ```
114 pub fn send_bincode<L2>(
115 self,
116 other: &Process<'a, L2>,
117 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
118 where
119 T: Serialize + DeserializeOwned,
120 {
121 let serialize_pipeline = Some(serialize_bincode::<T>(false));
122
123 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
124
125 Stream::new(
126 other.clone(),
127 HydroNode::Network {
128 serialize_fn: serialize_pipeline.map(|e| e.into()),
129 instantiate_fn: DebugInstantiate::Building,
130 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131 input: Box::new(self.ir_node.into_inner()),
132 metadata: other.new_node_metadata(
133 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
134 ),
135 },
136 )
137 }
138
139 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140 /// using [`bincode`] to serialize/deserialize messages.
141 ///
142 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143 /// membership information. This is a common pattern in distributed systems for broadcasting data to
144 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146 /// each element to all cluster members.
147 ///
148 /// # Non-Determinism
149 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150 /// to the current cluster members _at that point in time_. Depending on when we are notified of
151 /// membership changes, we will broadcast each element to different members.
152 ///
153 /// # Example
154 /// ```rust
155 /// # use hydro_lang::prelude::*;
156 /// # use futures::StreamExt;
157 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158 /// let p1 = flow.process::<()>();
159 /// let workers: Cluster<()> = flow.cluster::<()>();
160 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162 /// # on_worker.send_bincode(&p2).entries()
163 /// // if there are 4 members in the cluster, each receives one element
164 /// // - MemberId::<()>(0): [123]
165 /// // - MemberId::<()>(1): [123]
166 /// // - MemberId::<()>(2): [123]
167 /// // - MemberId::<()>(3): [123]
168 /// # }, |mut stream| async move {
169 /// # let mut results = Vec::new();
170 /// # for w in 0..4 {
171 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
172 /// # }
173 /// # results.sort();
174 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175 /// # }));
176 /// ```
177 pub fn broadcast_bincode<L2: 'a>(
178 self,
179 other: &Cluster<'a, L2>,
180 nondet_membership: NonDet,
181 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182 where
183 T: Clone + Serialize + DeserializeOwned,
184 {
185 let ids = track_membership(self.location.source_cluster_members(other));
186 let join_tick = self.location.tick();
187 let current_members = ids
188 .snapshot(&join_tick, nondet_membership)
189 .filter(q!(|b| *b));
190
191 self.batch(&join_tick, nondet_membership)
192 .repeat_with_keys(current_members)
193 .all_ticks()
194 .demux_bincode(other)
195 }
196
197 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198 /// serialization. The external process can receive these elements by establishing a TCP
199 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200 ///
201 /// # Example
202 /// ```rust
203 /// # use hydro_lang::prelude::*;
204 /// # use futures::StreamExt;
205 /// # tokio_test::block_on(async move {
206 /// let flow = FlowBuilder::new();
207 /// let process = flow.process::<()>();
208 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209 /// let external = flow.external::<()>();
210 /// let external_handle = numbers.send_bincode_external(&external);
211 ///
212 /// let mut deployment = hydro_deploy::Deployment::new();
213 /// let nodes = flow
214 /// .with_process(&process, deployment.Localhost())
215 /// .with_external(&external, deployment.Localhost())
216 /// .deploy(&mut deployment);
217 ///
218 /// deployment.deploy().await.unwrap();
219 /// // establish the TCP connection
220 /// let mut external_recv_stream = nodes.connect(external_handle).await;
221 /// deployment.start().await.unwrap();
222 ///
223 /// for w in 1..=3 {
224 /// assert_eq!(external_recv_stream.next().await, Some(w));
225 /// }
226 /// # });
227 /// ```
228 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229 where
230 T: Serialize + DeserializeOwned,
231 {
232 let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236 let external_key = flow_state_borrow.next_external_out;
237 flow_state_borrow.next_external_out += 1;
238
239 flow_state_borrow.push_root(HydroRoot::SendExternal {
240 to_external_id: other.id,
241 to_key: external_key,
242 to_many: false,
243 serialize_fn: serialize_pipeline.map(|e| e.into()),
244 instantiate_fn: DebugInstantiate::Building,
245 input: Box::new(self.ir_node.into_inner()),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248
249 ExternalBincodeStream {
250 process_id: other.id,
251 port_id: external_key,
252 _phantom: PhantomData,
253 }
254 }
255}
256
257impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
258 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
259{
260 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
261 /// using [`bincode`] to serialize/deserialize messages.
262 ///
263 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
264 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
265 /// this API allows precise targeting of specific cluster members rather than broadcasting to
266 /// all members.
267 ///
268 /// # Example
269 /// ```rust
270 /// # use hydro_lang::prelude::*;
271 /// # use futures::StreamExt;
272 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273 /// let p1 = flow.process::<()>();
274 /// let workers: Cluster<()> = flow.cluster::<()>();
275 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
276 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
277 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
278 /// .demux_bincode(&workers);
279 /// # on_worker.send_bincode(&p2).entries()
280 /// // if there are 4 members in the cluster, each receives one element
281 /// // - MemberId::<()>(0): [0]
282 /// // - MemberId::<()>(1): [1]
283 /// // - MemberId::<()>(2): [2]
284 /// // - MemberId::<()>(3): [3]
285 /// # }, |mut stream| async move {
286 /// # let mut results = Vec::new();
287 /// # for w in 0..4 {
288 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
289 /// # }
290 /// # results.sort();
291 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
292 /// # }));
293 /// ```
294 pub fn demux_bincode(
295 self,
296 other: &Cluster<'a, L2>,
297 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
298 where
299 T: Serialize + DeserializeOwned,
300 {
301 self.into_keyed().demux_bincode(other)
302 }
303}
304
305impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
306 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
307 /// [`bincode`] to serialize/deserialize messages.
308 ///
309 /// This provides load balancing by evenly distributing work across cluster members. The
310 /// distribution is deterministic based on element order - the first element goes to member 0,
311 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
312 ///
313 /// # Non-Determinism
314 /// The set of cluster members may asynchronously change over time. Each element is distributed
315 /// based on the current cluster membership _at that point in time_. Depending on when cluster
316 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
317 /// membership is stable, the order of members in the round-robin pattern may change across runs.
318 ///
319 /// # Ordering Requirements
320 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
321 /// order of messages and retries affects the round-robin pattern.
322 ///
323 /// # Example
324 /// ```rust
325 /// # use hydro_lang::prelude::*;
326 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
327 /// # use futures::StreamExt;
328 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
329 /// let p1 = flow.process::<()>();
330 /// let workers: Cluster<()> = flow.cluster::<()>();
331 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
332 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
333 /// on_worker.send_bincode(&p2)
334 /// # .first().values() // we use first to assert that each member gets one element
335 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
336 /// // - MemberId::<()>(?): [1]
337 /// // - MemberId::<()>(?): [2]
338 /// // - MemberId::<()>(?): [3]
339 /// // - MemberId::<()>(?): [4]
340 /// # }, |mut stream| async move {
341 /// # let mut results = Vec::new();
342 /// # for w in 0..4 {
343 /// # results.push(stream.next().await.unwrap());
344 /// # }
345 /// # results.sort();
346 /// # assert_eq!(results, vec![1, 2, 3, 4]);
347 /// # }));
348 /// ```
349 pub fn round_robin_bincode<L2: 'a>(
350 self,
351 other: &Cluster<'a, L2>,
352 nondet_membership: NonDet,
353 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
354 where
355 T: Serialize + DeserializeOwned,
356 {
357 let ids = track_membership(self.location.source_cluster_members(other));
358 let join_tick = self.location.tick();
359 let current_members = ids
360 .snapshot(&join_tick, nondet_membership)
361 .filter(q!(|b| *b))
362 .keys()
363 .assume_ordering(nondet_membership)
364 .collect_vec();
365
366 self.enumerate()
367 .batch(&join_tick, nondet_membership)
368 .cross_singleton(current_members)
369 .map(q!(|(data, members)| (
370 members[data.0 % members.len()],
371 data.1
372 )))
373 .all_ticks()
374 .demux_bincode(other)
375 }
376}
377
378impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
379 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
380 /// using [`bincode`] to serialize/deserialize messages.
381 ///
382 /// Each cluster member sends its local stream elements, and they are collected at the destination
383 /// as a [`KeyedStream`] where keys identify the source cluster member.
384 ///
385 /// # Example
386 /// ```rust
387 /// # use hydro_lang::prelude::*;
388 /// # use futures::StreamExt;
389 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
390 /// let workers: Cluster<()> = flow.cluster::<()>();
391 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
392 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
393 /// # all_received.entries()
394 /// # }, |mut stream| async move {
395 /// // if there are 4 members in the cluster, we should receive 4 elements
396 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
397 /// # let mut results = Vec::new();
398 /// # for w in 0..4 {
399 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
400 /// # }
401 /// # results.sort();
402 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
403 /// # }));
404 /// ```
405 ///
406 /// If you don't need to know the source for each element, you can use `.values()`
407 /// to get just the data:
408 /// ```rust
409 /// # use hydro_lang::prelude::*;
410 /// # use hydro_lang::live_collections::stream::NoOrder;
411 /// # use futures::StreamExt;
412 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
413 /// # let workers: Cluster<()> = flow.cluster::<()>();
414 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
415 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
416 /// # values
417 /// # }, |mut stream| async move {
418 /// # let mut results = Vec::new();
419 /// # for w in 0..4 {
420 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
421 /// # }
422 /// # results.sort();
423 /// // if there are 4 members in the cluster, we should receive 4 elements
424 /// // 1, 1, 1, 1
425 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
426 /// # }));
427 /// ```
428 pub fn send_bincode<L2>(
429 self,
430 other: &Process<'a, L2>,
431 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
432 where
433 T: Serialize + DeserializeOwned,
434 {
435 let serialize_pipeline = Some(serialize_bincode::<T>(false));
436
437 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
438
439 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
440 other.clone(),
441 HydroNode::Network {
442 serialize_fn: serialize_pipeline.map(|e| e.into()),
443 instantiate_fn: DebugInstantiate::Building,
444 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
445 input: Box::new(self.ir_node.into_inner()),
446 metadata: other.new_node_metadata(Stream::<
447 (MemberId<L>, T),
448 Process<'a, L2>,
449 Unbounded,
450 O,
451 R,
452 >::collection_kind()),
453 },
454 );
455
456 raw_stream.into_keyed()
457 }
458
459 /// Broadcasts elements of this stream at each source member to all members of a destination
460 /// cluster, using [`bincode`] to serialize/deserialize messages.
461 ///
462 /// Each source member sends each of its stream elements to **every** member of the cluster
463 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
464 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
465 /// **only data elements** and sends each element to all cluster members.
466 ///
467 /// # Non-Determinism
468 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
469 /// to the current cluster members known _at that point in time_ at the source member. Depending
470 /// on when each source member is notified of membership changes, it will broadcast each element
471 /// to different members.
472 ///
473 /// # Example
474 /// ```rust
475 /// # use hydro_lang::prelude::*;
476 /// # use hydro_lang::location::MemberId;
477 /// # use futures::StreamExt;
478 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
479 /// # type Source = ();
480 /// # type Destination = ();
481 /// let source: Cluster<Source> = flow.cluster::<Source>();
482 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
483 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
484 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
485 /// # on_destination.entries().send_bincode(&p2).entries()
486 /// // if there are 4 members in the desination, each receives one element from each source member
487 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
488 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
489 /// // - ...
490 /// # }, |mut stream| async move {
491 /// # let mut results = Vec::new();
492 /// # for w in 0..16 {
493 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
494 /// # }
495 /// # results.sort();
496 /// # assert_eq!(results, vec![
497 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
498 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
499 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
500 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
501 /// # ]);
502 /// # }));
503 /// ```
504 pub fn broadcast_bincode<L2: 'a>(
505 self,
506 other: &Cluster<'a, L2>,
507 nondet_membership: NonDet,
508 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
509 where
510 T: Clone + Serialize + DeserializeOwned,
511 {
512 let ids = track_membership(self.location.source_cluster_members(other));
513 let join_tick = self.location.tick();
514 let current_members = ids
515 .snapshot(&join_tick, nondet_membership)
516 .filter(q!(|b| *b));
517
518 self.batch(&join_tick, nondet_membership)
519 .repeat_with_keys(current_members)
520 .all_ticks()
521 .demux_bincode(other)
522 }
523}
524
525impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
526 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
527{
528 /// Sends elements of this stream at each source member to specific members of a destination
529 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
530 ///
531 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
532 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
533 /// this API allows precise targeting of specific cluster members rather than broadcasting to
534 /// all members.
535 ///
536 /// Each cluster member sends its local stream elements, and they are collected at each
537 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
538 ///
539 /// # Example
540 /// ```rust
541 /// # use hydro_lang::prelude::*;
542 /// # use futures::StreamExt;
543 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
544 /// # type Source = ();
545 /// # type Destination = ();
546 /// let source: Cluster<Source> = flow.cluster::<Source>();
547 /// let to_send: Stream<_, Cluster<_>, _> = source
548 /// .source_iter(q!(vec![0, 1, 2, 3]))
549 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
550 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
551 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
552 /// # all_received.entries().send_bincode(&p2).entries()
553 /// # }, |mut stream| async move {
554 /// // if there are 4 members in the destination cluster, each receives one message from each source member
555 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
556 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
557 /// // - ...
558 /// # let mut results = Vec::new();
559 /// # for w in 0..16 {
560 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
561 /// # }
562 /// # results.sort();
563 /// # assert_eq!(results, vec![
564 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
565 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
566 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
567 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
568 /// # ]);
569 /// # }));
570 /// ```
571 pub fn demux_bincode(
572 self,
573 other: &Cluster<'a, L2>,
574 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
575 where
576 T: Serialize + DeserializeOwned,
577 {
578 self.into_keyed().demux_bincode(other)
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use stageleft::q;
585
586 use crate::location::{Location, MemberId};
587 use crate::nondet::nondet;
588 use crate::prelude::FlowBuilder;
589
590 #[test]
591 fn sim_send_bincode_o2o() {
592 let flow = FlowBuilder::new();
593 let external = flow.external::<()>();
594 let node = flow.process::<()>();
595 let node2 = flow.process::<()>();
596
597 let (port, input) = node.source_external_bincode(&external);
598
599 let out_port = input
600 .send_bincode(&node2)
601 .batch(&node2.tick(), nondet!(/** test */))
602 .count()
603 .all_ticks()
604 .send_bincode_external(&external);
605
606 let instances = flow.sim().exhaustive(async |mut compiled| {
607 let in_send = compiled.connect(&port);
608 let out_recv = compiled.connect(&out_port);
609 compiled.launch();
610
611 in_send.send(()).unwrap();
612 in_send.send(()).unwrap();
613 in_send.send(()).unwrap();
614
615 let received = out_recv.collect::<Vec<_>>().await;
616 assert!(received.into_iter().sum::<usize>() == 3);
617 });
618
619 assert_eq!(instances, 4); // 2^{3 - 1}
620 }
621
622 #[test]
623 fn sim_send_bincode_m2o() {
624 let flow = FlowBuilder::new();
625 let external = flow.external::<()>();
626 let cluster = flow.cluster::<()>();
627 let node = flow.process::<()>();
628
629 let input = cluster.source_iter(q!(vec![1]));
630
631 let out_port = input
632 .send_bincode(&node)
633 .entries()
634 .batch(&node.tick(), nondet!(/** test */))
635 .all_ticks()
636 .send_bincode_external(&external);
637
638 let instances =
639 flow.sim()
640 .with_cluster_size(&cluster, 4)
641 .exhaustive(async |mut compiled| {
642 let out_recv = compiled.connect(&out_port);
643 compiled.launch();
644
645 out_recv
646 .assert_yields_only_unordered(vec![
647 (MemberId::from_raw(0), 1),
648 (MemberId::from_raw(1), 1),
649 (MemberId::from_raw(2), 1),
650 (MemberId::from_raw(3), 1),
651 ])
652 .await
653 });
654
655 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
656 }
657
658 #[test]
659 fn sim_send_bincode_multiple_m2o() {
660 let flow = FlowBuilder::new();
661 let external = flow.external::<()>();
662 let cluster1 = flow.cluster::<()>();
663 let cluster2 = flow.cluster::<()>();
664 let node = flow.process::<()>();
665
666 let out_port_1 = cluster1
667 .source_iter(q!(vec![1]))
668 .send_bincode(&node)
669 .entries()
670 .send_bincode_external(&external);
671
672 let out_port_2 = cluster2
673 .source_iter(q!(vec![2]))
674 .send_bincode(&node)
675 .entries()
676 .send_bincode_external(&external);
677
678 let instances = flow
679 .sim()
680 .with_cluster_size(&cluster1, 3)
681 .with_cluster_size(&cluster2, 4)
682 .exhaustive(async |mut compiled| {
683 let out_recv_1 = compiled.connect(&out_port_1);
684 let out_recv_2 = compiled.connect(&out_port_2);
685 compiled.launch();
686
687 out_recv_1
688 .assert_yields_only_unordered(vec![
689 (MemberId::from_raw(0), 1),
690 (MemberId::from_raw(1), 1),
691 (MemberId::from_raw(2), 1),
692 ])
693 .await;
694
695 out_recv_2
696 .assert_yields_only_unordered(vec![
697 (MemberId::from_raw(0), 2),
698 (MemberId::from_raw(1), 2),
699 (MemberId::from_raw(2), 2),
700 (MemberId::from_raw(3), 2),
701 ])
702 .await;
703 });
704
705 assert_eq!(instances, 1);
706 }
707
708 #[test]
709 fn sim_send_bincode_o2m() {
710 let flow = FlowBuilder::new();
711 let external = flow.external::<()>();
712 let cluster = flow.cluster::<()>();
713 let node = flow.process::<()>();
714
715 let input = node.source_iter(q!(vec![
716 (MemberId::from_raw(0), 123),
717 (MemberId::from_raw(1), 456),
718 ]));
719
720 let out_port = input
721 .demux_bincode(&cluster)
722 .map(q!(|x| x + 1))
723 .send_bincode(&node)
724 .entries()
725 .send_bincode_external(&external);
726
727 flow.sim()
728 .with_cluster_size(&cluster, 4)
729 .exhaustive(async |mut compiled| {
730 let out_recv = compiled.connect(&out_port);
731 compiled.launch();
732
733 out_recv
734 .assert_yields_only_unordered(vec![
735 (MemberId::from_raw(0), 124),
736 (MemberId::from_raw(1), 457),
737 ])
738 .await
739 });
740 }
741
742 #[test]
743 fn sim_broadcast_bincode_o2m() {
744 let flow = FlowBuilder::new();
745 let external = flow.external::<()>();
746 let cluster = flow.cluster::<()>();
747 let node = flow.process::<()>();
748
749 let input = node.source_iter(q!(vec![123, 456]));
750
751 let out_port = input
752 .broadcast_bincode(&cluster, nondet!(/** test */))
753 .map(q!(|x| x + 1))
754 .send_bincode(&node)
755 .entries()
756 .send_bincode_external(&external);
757
758 let mut c_1_produced = false;
759 let mut c_2_produced = false;
760
761 flow.sim()
762 .with_cluster_size(&cluster, 2)
763 .exhaustive(async |mut compiled| {
764 let out_recv = compiled.connect(&out_port);
765 compiled.launch();
766
767 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
768
769 // check that order is preserved
770 if all_out.contains(&(MemberId::from_raw(0), 124)) {
771 assert!(all_out.contains(&(MemberId::from_raw(0), 457)));
772 c_1_produced = true;
773 }
774
775 if all_out.contains(&(MemberId::from_raw(1), 124)) {
776 assert!(all_out.contains(&(MemberId::from_raw(1), 457)));
777 c_2_produced = true;
778 }
779 });
780
781 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
782 }
783
784 #[test]
785 fn sim_send_bincode_m2m() {
786 let flow = FlowBuilder::new();
787 let external = flow.external::<()>();
788 let cluster = flow.cluster::<()>();
789 let node = flow.process::<()>();
790
791 let input = node.source_iter(q!(vec![
792 (MemberId::from_raw(0), 123),
793 (MemberId::from_raw(1), 456),
794 ]));
795
796 let out_port = input
797 .demux_bincode(&cluster)
798 .map(q!(|x| x + 1))
799 .flat_map_ordered(q!(|x| vec![
800 (MemberId::from_raw(0), x),
801 (MemberId::from_raw(1), x),
802 ]))
803 .demux_bincode(&cluster)
804 .entries()
805 .send_bincode(&node)
806 .entries()
807 .send_bincode_external(&external);
808
809 flow.sim()
810 .with_cluster_size(&cluster, 4)
811 .exhaustive(async |mut compiled| {
812 let out_recv = compiled.connect(&out_port);
813 compiled.launch();
814
815 out_recv
816 .assert_yields_only_unordered(vec![
817 (MemberId::from_raw(0), (MemberId::from_raw(0), 124)),
818 (MemberId::from_raw(0), (MemberId::from_raw(1), 457)),
819 (MemberId::from_raw(1), (MemberId::from_raw(0), 124)),
820 (MemberId::from_raw(1), (MemberId::from_raw(1), 457)),
821 ])
822 .await
823 });
824 }
825}