Hydroflow's Operators
In our previous examples we made use of some of Hydroflow's operators. Here we document each operator in more detail. Most of these operators are based on the Rust equivalents for iterators; see the Rust documentation.
anti_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]anti_join() -> | exactly 1 | Blocking |
Input port names:
pos(streaming),neg(blocking)
2 input streams the first of type (K, T), the second of type K, with output type (K, T)
For a given tick, computes the anti-join of the items in the input
streams, returning items in the pos input that do not have matching keys
in the neg input.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print ("elephant", 3) source_iter(vec![("dog", 1), ("cat", 2), ("elephant", 3)]) -> [pos]diff; source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff; diff = anti_join() -> for_each(|v: (_, _)| println!("{}, {}", v.0, v.1)); // elephant 3 }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
batch
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> batch(A, B) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: First argument is the maximum batch size that batch() will buffer up before completely releasing the batch. The second argument is the receive end of a tokio channel that signals when to release the batch downstream.
Given a Stream
created in Rust code, batch
is passed the receive end of the channel and when receiving any element
will pass through all received inputs to the output unchanged.
#![allow(unused)] fn main() { let (tx, rx) = hydroflow::util::unbounded_channel::<()>(); // Will print 0, 1, 2, 3, 4 each on a new line just once. let mut df = hydroflow::hydroflow_syntax! { repeat_iter(0..5) -> batch(10, rx) -> for_each(|x| { println!("{x}"); }); }; tx.send(()).unwrap(); df.run_available(); }
cross_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]cross_join() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
2 input streams of type S and T, 1 output stream of type (S, T)
Forms the cross-join (Cartesian product) of the items in the input streams, returning all tupled pairs.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print all 4 pairs of emotion and animal source_iter(vec!["happy", "sad"]) -> [0]my_join; source_iter(vec!["dog", "cat"]) -> [1]my_join; my_join = cross_join() -> for_each(|(v1, v2)| println!("({}, {})", v1, v2)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
cross_join can also be provided with one or two generic lifetime persistence arguments
in the same was as join, see join's documentation for more info.
cross_join also accepts one type argument that controls how the join state is built up. This (currently) allows switching between a SetUnion and NonSetUnion implementation.
For example:
join::<HalfSetJoinState>();
join::<HalfMultisetJoinState>();
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<&str>(); let mut flow = hydroflow::hydroflow_syntax! { my_join = cross_join::<'tick>(); source_iter(["hello", "bye"]) -> [0]my_join; source_stream(input_recv) -> [1]my_join; my_join -> for_each(|(s, t)| println!("({}, {})", s, t)); }; input_send.send("oakland").unwrap(); flow.run_tick(); input_send.send("san francisco").unwrap(); flow.run_tick(); }
Prints only "(hello, oakland)" and "(bye, oakland)". The source_iter is only included in
the first tick, then forgotten.
demux
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> demux(A)[<output_port>] -> | at least 2 | Streaming |
Output port names: Variadic, as specified in arguments.
Arguments: A Rust closure, the first argument is a received item and the second argument is a variadic
var_args!tuple list where each item name is an output port.
Takes the input stream and allows the user to determine what elemnt(s) to deliver to any number of output streams.
Note: Downstream operators may need explicit type annotations.
Note: The
Pusheratortrait is automatically imported to enable the.give(...)method.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { my_demux = source_iter(1..=100) -> demux(|v, var_args!(fzbz, fizz, buzz, vals)| match (v % 3, v % 5) { (0, 0) => fzbz.give(v), (0, _) => fizz.give(v), (_, 0) => buzz.give(v), (_, _) => vals.give(v), } ); my_demux[fzbz] -> for_each(|v| println!("{}: fizzbuzz", v)); my_demux[fizz] -> for_each(|v| println!("{}: fizz", v)); my_demux[buzz] -> for_each(|v| println!("{}: buzz", v)); my_demux[vals] -> for_each(|v| println!("{}", v)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
dest_file
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_file(A, B) | exactly 0 | Streaming |
0 input streams, 1 output stream
Arguments: (1) An
AsRef<Path>for a file to write to, and (2) a boolappend.
Consumes Strings by writing them as lines to a file. The file will be created if it doesn't
exist. Lines will be appended to the file if append is true, otherwise the file will be
truncated before lines are written.
Note this operator must be used within a Tokio runtime.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(1..=10) -> map(|n| format!("Line {}", n)) -> dest_file("dest.txt", false); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
dest_sink
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_sink(A) | exactly 0 | Streaming |
Arguments: An async
Sink.
Consumes items by sending them to an async Sink.
A Sink is a thing into which values can be sent, asynchronously. For example, sending items
into a bounded channel.
Note this operator must be used within a Tokio runtime.
#[tokio::main(flavor = "current_thread")] async fn main() { // In this example we use a _bounded_ channel for our `Sink`. This is for demonstration only, // instead you should use [`hydroflow::util::unbounded_channel`]. A bounded channel results in // `Hydroflow` buffering items internally instead of within the channel. (We can't use // unbounded here since unbounded channels are synchonous to write to and therefore not // `Sink`s.) let (send, recv) = tokio::sync::mpsc::channel::<usize>(5); // `PollSender` adapts the send half of the bounded channel into a `Sink`. let send = tokio_util::sync::PollSender::new(send); let mut flow = hydroflow::hydroflow_syntax! { source_iter(0..10) -> dest_sink(send); }; // Call `run_async()` to allow async events to propegate, run for one second. tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async()) .await .expect_err("Expected time out"); let mut recv = tokio_stream::wrappers::ReceiverStream::new(recv); // Only 5 elements received due to buffer size. // (Note that if we were using a multi-threaded executor instead of `current_thread` it would // be possible for more items to be added as they're removed, resulting in >5 collected.) let out: Vec<_> = hydroflow::util::ready_iter(&mut recv).collect(); assert_eq!(&[0, 1, 2, 3, 4], &*out); }
Sink is different from AsyncWrite.
Instead of discrete values we send arbitrary streams of bytes into an AsyncWrite value. For
example, writings a stream of bytes to a file, a socket, or stdout.
To handle those situations we can use a codec from tokio_util::codec.
These specify ways in which the byte stream is broken into individual items, such as with
newlines or with length delineation.
If we only want to write a stream of bytes without delineation we can use the BytesCodec.
In this example we use a duplex as our AsyncWrite with a
BytesCodec.
#[tokio::main] async fn main() { use bytes::Bytes; use tokio::io::AsyncReadExt; // Like a channel, but for a stream of bytes instead of discrete objects. let (asyncwrite, mut asyncread) = tokio::io::duplex(256); // Now instead handle discrete byte strings by length-encoding them. let sink = tokio_util::codec::FramedWrite::new(asyncwrite, tokio_util::codec::BytesCodec::new()); let mut flow = hydroflow::hydroflow_syntax! { source_iter([ Bytes::from_static(b"hello"), Bytes::from_static(b"world"), ]) -> dest_sink(sink); }; tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async()) .await .expect_err("Expected time out"); let mut buf = Vec::<u8>::new(); asyncread.read_buf(&mut buf).await.unwrap(); assert_eq!(b"helloworld", &*buf); }
dest_sink_serde
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> dest_sink_serde(A) | exactly 0 | Streaming |
Arguments: A serializing async
Sink.
Consumes (payload, addr) pairs by serializing the payload and sending the resulting pair to an async Sink.
Note this operator must be used within a Tokio runtime.
#![allow(unused)] fn main() { async fn serde_out() { let addr = hydroflow::util::ipv4_resolve("localhost:9000".into()).unwrap(); let (outbound, inbound, _) = hydroflow::util::bind_udp_bytes(addr).await; let remote = hydroflow::util::ipv4_resolve("localhost:9001".into()).unwrap(); let mut flow = hydroflow::hydroflow_syntax! { source_iter(vec![("hello".to_string(), 1), ("world".to_string(), 2)]) -> map (|m| (m, remote)) -> dest_sink_serde(outbound); }; flow.run_available(); } }
difference
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]difference() -> | exactly 1 | Blocking |
Input port names:
pos(streaming),neg(blocking)
2 input streams of the same type T, 1 output stream of type T
For a given tick, forms the set difference of the items in the input
streams, returning items in the pos input that are not found in the
neg input.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print "elephant" source_iter(vec!["dog", "cat", "elephant"]) -> [pos]diff; source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff; diff = difference() -> for_each(|v| println!("{}", v)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
enumerate
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> enumerate() -> | exactly 1 | Streaming |
1 input stream of type
T, 1 output stream of type(usize, T)
For each item passed in, enumerate it with its index: (0, x_0), (1, x_1), etc.
enumerate can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify if indexing resets. If 'tick is specified, indexing will
restart at zero at the start of each tick. Otherwise 'static (the default) will never reset
and count monotonically upwards.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["hello", "world"]) -> enumerate() -> for_each(|(i, x)| println!("{}: {}", i, x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
filter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> filter(A) -> | exactly 1 | Streaming |
Filter outputs a subsequence of the items it receives at its input, according to a Rust boolean closure passed in as an argument.
The closure receives a reference &T rather than an owned value T because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use filter_map instead.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["hello", "world"]) -> filter(|x| x.starts_with('w')) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
filter_map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> filter_map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
An operator that both filters and maps. It yields only the items for which the supplied closure returns Some(value).
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["1", "hello", "world", "2"]) -> filter_map(|s| s.parse().ok()) -> for_each(|x: usize| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
flat_map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> flat_map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: A Rust closure that handles an iterator
For each item i passed in, treat i as an iterator and map the closure to that
iterator to produce items one by one. The type of the input items must be iterable.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print out each character of each word on a separate line source_iter(vec!["hello", "world"]) -> flat_map(|x| x.chars()) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
flatten
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> flatten() -> | exactly 1 | Streaming |
1 input stream, 1 output stream
For each item i passed in, treat i as an iterator and produce its items one by one.
The type of the input items must be iterable.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print the numbers 1-6 without any nesting source_iter(vec![[1, 2], [3, 4], [5, 6]]) -> flatten() -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
fold
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> fold(A, B) -> | exactly 1 | Blocking |
1 input stream, 1 output stream
Arguments: an initial value, and a closure which itself takes two arguments: an 'accumulator', and an element. The closure returns the value that the accumulator should have for the next iteration.
Akin to Rust's built-in fold operator. Folds every element into an accumulator by applying a closure, returning the final result.
Note: The closure has access to the
contextobject.
fold can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. With 'tick, values will only be collected
within the same tick. With 'static, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'static.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print `Reassembled vector [1,2,3,4,5]` source_iter([1,2,3,4,5]) -> fold::<'tick>(Vec::new(), |mut accum, elem| { accum.push(elem); accum }) -> for_each(|e| println!("Ressembled vector {:?}", e)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
for_each
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> for_each(A) | exactly 0 | Streaming |
1 input stream, 0 output streams
Arguments: a Rust closure
Iterates through a stream passing each element to the closure in the argument.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["Hello", "World"]) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
group_by
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> group_by(A, B) -> | exactly 1 | Blocking |
An alias for keyed_fold.
identity
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> identity() -> | exactly 1 | Streaming |
1 input stream of type T, 1 output stream of type T
For each item passed in, pass it out without any change.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print "hello" and "world" on separate lines (in either order) source_iter(vec!["hello", "world"]) -> identity() -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
You can also supply a type parameter identity::<MyType>() to specify what items flow thru the
the pipeline. This can be useful for helping the compiler infer types.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // Use type parameter to ensure items are `i32`s. source_iter(0..10) -> identity::<i32>() -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
initialize
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | initialize() -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: None.
Emits a single unit () at the start of the first tick.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { initialize() -> for_each(|()| println!("This only runs one time!")); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
inspect
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> inspect(A) -> | exactly 1 | Streaming |
Arguments: A single closure
FnMut(&Item).
An operator which allows you to "inspect" each element of a stream without modifying it. The closure is called on a reference to each item. This is mainly useful for debugging as in the example below, and it is generally an anti-pattern to provide a closure with side effects.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter([1, 2, 3, 4]) -> inspect(|&x| println!("{}", x)) -> null(); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]join() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
Forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print `(hello, (world, cleveland))` source_iter(vec![("hello", "world"), ("stay", "gold")]) -> [0]my_join; source_iter(vec![("hello", "cleveland")]) -> [1]my_join; my_join = join() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
join can also be provided with one or two generic lifetime persistence arguments, either
'tick or 'static, to specify how join data persists. With 'tick, pairs will only be
joined with corresponding pairs within the same tick. With 'static, pairs will be remembered
across ticks and will be joined with pairs arriving in later ticks. When not explicitly
specified persistence defaults to `static.
When two persistence arguments are supplied the first maps to port 0 and the second maps to
port 1.
When a single persistence argument is supplied, it is applied to both input ports.
When no persistence arguments are applied it defaults to 'static for both.
The syntax is as follows:
join(); // Or
join::<'static>();
join::<'tick>();
join::<'static, 'tick>();
join::<'tick, 'static>();
// etc.
Join also accepts one type argument that controls how the join state is built up. This (currently) allows switching between a SetUnion and NonSetUnion implementation. For example:
join::<HalfSetJoinState>();
join::<HalfMultisetJoinState>();
Examples
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { source_iter([("hello", "world")]) -> [0]my_join; source_stream(input_recv) -> [1]my_join; my_join = join::<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2)); }; input_send.send(("hello", "oakland")).unwrap(); flow.run_tick(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_tick(); }
Prints out "(hello, (world, oakland))" since source_iter([("hello", "world")]) is only
included in the first tick, then forgotten.
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { source_iter([("hello", "world")]) -> [0]my_join; source_stream(input_recv) -> [1]my_join; my_join = join::<'static>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2)); }; input_send.send(("hello", "oakland")).unwrap(); flow.run_tick(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_tick(); }
Prints out "(hello, (world, oakland))" and "(hello, (world, san francisco))" since the
inputs are peristed across ticks.
keyed_fold
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> keyed_fold(A, B) -> | exactly 1 | Blocking |
1 input stream of type
(K, V1), 1 output stream of type(K, V2). The output will have one tuple for each distinctK, with an accumulated value of typeV2.
If the input and output value types are the same and do not require initialization then use
keyed_reduce.
Arguments: two Rust closures. The first generates an initial value per group. The second itself takes two arguments: an 'accumulator', and an element. The second closure returns the value that the accumulator should have for the next iteration.
A special case of fold, in the spirit of SQL's GROUP BY and aggregation constructs. The input
is partitioned into groups by the first field, and for each group the values in the second
field are accumulated via the closures in the arguments.
Note: The closures have access to the
contextobject.
keyed_fold can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. With 'tick, values will only be collected
within the same tick. With 'static, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'static.
keyed_fold can also be provided with two type arguments, the key type K and aggregated
output value type V2. This is required when using 'static persistence if the compiler
cannot infer the types.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)]) -> keyed_fold(|| 0, |old: &mut u32, val: u32| *old += val) -> for_each(|(k, v)| println!("Total for group {} is {}", k, v)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
Example using 'tick persistence:
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> keyed_fold::<'tick, &str, String>(String::new, |old: &mut _, val| { *old += val; *old += ", "; }) -> for_each(|(k, v)| println!("({:?}, {:?})", k, v)); }; input_send.send(("hello", "oakland")).unwrap(); input_send.send(("hello", "berkeley")).unwrap(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_available(); // ("hello", "oakland, berkeley, san francisco, ") input_send.send(("hello", "palo alto")).unwrap(); flow.run_available(); // ("hello", "palo alto, ") }
keyed_reduce
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> keyed_reduce(A) -> | exactly 1 | Blocking |
1 input stream of type
(K, V), 1 output stream of type(K, V). The output will have one tuple for each distinctK, with an accumulated (reduced) value of typeV.
If you need the accumulated value to have a different type, use keyed_fold.
Arguments: one Rust closures. The closure takes two arguments: an
&mut'accumulator', and an element. Accumulator should be updated based on the element.
A special case of fold, in the spirit of SQL's GROUP BY and aggregation constructs. The input
is partitioned into groups by the first field, and for each group the values in the second
field are accumulated via the closures in the arguments.
Note: The closures have access to the
contextobject.
keyed_reduce can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. With 'tick, values will only be collected
within the same tick. With 'static, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'static.
keyed_reduce can also be provided with two type arguments, the key and value type. This is
required when using 'static persistence if the compiler cannot infer the types.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)]) -> keyed_reduce(|old: &mut u32, val: u32| *old += val) -> for_each(|(k, v)| println!("Total for group {} is {}", k, v)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
Example using 'tick persistence:
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> keyed_reduce::<'tick, &str>(|old: &mut _, val| *old = std::cmp::max(*old, val)) -> for_each(|(k, v)| println!("({:?}, {:?})", k, v)); }; input_send.send(("hello", "oakland")).unwrap(); input_send.send(("hello", "berkeley")).unwrap(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_available(); // ("hello", "oakland, berkeley, san francisco, ") input_send.send(("hello", "palo alto")).unwrap(); flow.run_available(); // ("hello", "palo alto, ") }
lattice_batch
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> lattice_batch(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: The one and only argument is the receive end of a tokio channel that signals when to release the batch downstream.
Given a Stream
created in Rust code, lattice_batch
is passed the receive end of the channel and when receiving any element
will pass through all received inputs to the output unchanged.
#![allow(unused)] fn main() { let (tx, rx) = hydroflow::util::unbounded_channel::<()>(); // Will print 0, 1, 2, 3, 4 each on a new line just once. let mut df = hydroflow::hydroflow_syntax! { repeat_iter(0..5) -> map(|x| hydroflow::lattices::Max::new(x)) -> lattice_batch::<hydroflow::lattices::Max<usize>>(rx) -> for_each(|x| { println!("{x:?}"); }); }; tx.send(()).unwrap(); df.run_available(); }
lattice_join
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 2 | -> [<input_port>]lattice_join() -> | exactly 1 | Streaming |
Input port names:
0(streaming),1(streaming)
2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
Performs a group-by with lattice-merge aggregate function on LHS and RHS inputs and then forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.
You must specify the LHS and RHS lattice types, they cannot be inferred.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print `(key, (2, 1))` my_join = lattice_join::<hydroflow::lattices::Max<usize>, hydroflow::lattices::Max<usize>>(); source_iter(vec![("key", hydroflow::lattices::Max::new(0)), ("key", hydroflow::lattices::Max::new(2))]) -> [0]my_join; source_iter(vec![("key", hydroflow::lattices::Max::new(1))]) -> [1]my_join; my_join -> for_each(|(k, (v1, v2))| println!("({}, ({:?}, {:?}))", k, v1, v2)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
lattice_join can also be provided with one or two generic lifetime persistence arguments, either
'tick or 'static, to specify how join data persists. With 'tick, pairs will only be
joined with corresponding pairs within the same tick. With 'static, pairs will be remembered
across ticks and will be joined with pairs arriving in later ticks. When not explicitly
specified persistence defaults to `static.
When two persistence arguments are supplied the first maps to port 0 and the second maps to
port 1.
When a single persistence argument is supplied, it is applied to both input ports.
When no persistence arguments are applied it defaults to 'static for both.
The syntax is as follows:
lattice_join::<MaxRepr<usize>, MaxRepr<usize>>(); // Or
lattice_join::<'static, MaxRepr<usize>, MaxRepr<usize>>();
lattice_join::<'tick, MaxRepr<usize>, MaxRepr<usize>>();
lattice_join::<'static, 'tick, MaxRepr<usize>, MaxRepr<usize>>();
lattice_join::<'tick, 'static, MaxRepr<usize>, MaxRepr<usize>>();
// etc.
Examples
#![allow(unused)] fn main() { use hydroflow::lattices::Max; let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>(); let (out_tx, mut out_rx) = hydroflow::util::unbounded_channel::<(usize, (Max<usize>, Max<usize>))>(); let mut df = hydroflow::hydroflow_syntax! { my_join = lattice_join::<'tick, Max<usize>, Max<usize>>(); source_iter([(7, Max::new(2)), (7, Max::new(1))]) -> [0]my_join; source_stream(input_recv) -> [1]my_join; my_join -> for_each(|v| out_tx.send(v).unwrap()); }; input_send.send((7, Max::new(5))).unwrap(); df.run_tick(); let out: Vec<_> = hydroflow::util::collect_ready(&mut out_rx); assert_eq!(out, vec![(7, (Max::new(2), Max::new(5)))]); }
lattice_merge
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> lattice_merge() -> | exactly 1 | Blocking |
1 input stream, 1 output stream
Generic parameters: A
LatticeReprtype.
A specialized operator for merging lattices together into a accumulated value. Like fold()
but specialized for lattice types. lattice_merge::<MyLattice>() is equivalent to fold(Default::default, hydroflow::lattices::Merge::merge_owned).
lattice_merge can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. With 'tick, values will only be collected
within the same tick. With 'static, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'static.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter([1,2,3,4,5]) -> map(hydroflow::lattices::Max::new) -> lattice_merge::<'static, hydroflow::lattices::Max<usize>>() -> for_each(|x: hydroflow::lattices::Max<usize>| println!("Least upper bound: {}", x.0)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
map
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: A Rust closure For each item passed in, apply the closure to generate an item to emit.
If you do not want to modify the item stream and instead only want to view
each item use the inspect operator instead.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["hello", "world"]) -> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
merge
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| at least 2 | -> [<input_port>]merge() -> | exactly 1 | Streaming |
n input streams of the same type, 1 output stream of the same type
Merges an arbitrary number of input streams into a single stream. Each input sequence is a subsequence of the output, but no guarantee is given on how the inputs are interleaved.
Since merge has multiple input streams, it needs to be assigned to
a variable to reference its multiple input ports across statements.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["hello", "world"]) -> my_merge; source_iter(vec!["stay", "gold"]) -> my_merge; source_iter(vec!["don\'t", "give", "up"]) -> my_merge; my_merge = merge() -> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
next_stratum
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> next_stratum() -> | exactly 1 | Blocking |
Delays all elements which pass through to the next stratum (in the same tick).
next_tick
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> next_tick() -> | exactly 1 | Blocking |
Delays all elements which pass through to the next tick. In short, execution of a hydroflow graph runs as a sequence of distinct "ticks". Non-monotonic operators compute their output in terms of each tick so execution doesn't have to block, and it is up to the user to coordinate data between tick executions to achieve the desired result.
An tick may be divided into multiple strata, see the next_stratum()
operator.
In the example below next_tick() is used alongside difference() to
ignore any items in the current tick that already appeared in the previous
tick.
#![allow(unused)] fn main() { // Outputs 1 2 3 4 5 6 (on separate lines). let (input_send, input_recv) = hydroflow::util::unbounded_channel::<usize>(); let mut flow = hydroflow::hydroflow_syntax! { inp = source_stream(input_recv) -> tee(); diff = difference() -> for_each(|x| println!("{}", x)); inp -> [pos]diff; inp -> next_tick() -> [neg]diff; }; for x in [1, 2, 3, 4] { input_send.send(x).unwrap(); } flow.run_tick(); for x in [3, 4, 5, 6] { input_send.send(x).unwrap(); } flow.run_tick(); }
null
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| at least 0 and at most 1 | null() | at least 0 and at most 1 | Streaming |
unbounded number of input streams of any type, unbounded number of output streams of any type.
As a source, generates nothing. As a sink, absorbs anything with no effect.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print `1, 2, 3, 4, 5, 6, a, b, c` across 9 lines null() -> for_each(|_: ()| panic!()); source_iter([1,2,3]) -> map(|i| println!("{}", i)) -> null(); null_src = null(); null_sink = null(); null_src[0] -> for_each(|_: ()| panic!()); // note: use `for_each()` (or `inspect()`) instead of this: source_iter([4,5,6]) -> map(|i| println!("{}", i)) -> [0]null_sink; }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
persist
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> persist() -> | exactly 1 | Streaming |
Stores each item as it passes through, and replays all item every tick.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // Normally `source_iter(...)` only emits once, but with `persist()` will replay the `"hello"` // on every tick. This is equivalent to `repeat_iter(["hello"])`. source_iter(["hello"]) -> persist() -> for_each(|item| println!("{}: {}", context.current_tick(), item)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
persist() can be used to introduce statefulness into stateless pipelines. For example this
join only stores data for single 'tick. The persist() operator introduces statefulness
across ticks. This can be useful for optimization transformations within the hydroflow
compiler.
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { repeat_iter([("hello", "world")]) -> [0]my_join; source_stream(input_recv) -> persist() -> [1]my_join; my_join = join::<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2)); }; input_send.send(("hello", "oakland")).unwrap(); flow.run_tick(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_tick(); // (hello, (world, oakland)) // (hello, (world, oakland)) // (hello, (world, san francisco)) }
reduce
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> reduce(A) -> | exactly 1 | Blocking |
1 input stream, 1 output stream
Arguments: a closure which itself takes two arguments: an ‘accumulator’, and an element. The closure returns the value that the accumulator should have for the next iteration.
Akin to Rust's built-in reduce operator. Folds every element into an accumulator by applying a closure, returning the final result.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print 120 (i.e., 1*2*3*4*5) source_iter([1,2,3,4,5]) -> reduce(|mut accum, elem| { accum *= elem; accum }) -> for_each(|e| println!("{}", e)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
repeat_fn
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | repeat_fn(A, B) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: A batch size per tick, and a zero argument closure to produce each item in the stream. Similar to
repeat_iter, but generates the items by calling the supplied closure instead of cloning them from an input iter
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { repeat_fn(10, || 7) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
repeat_iter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | repeat_iter(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: An iterable Rust object. Similar to
source_iter, but delivers all elements from the iterable object on every tick (wheresource_iteronly delivers on the first tick).
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { repeat_iter(vec!["Hello", "World"]) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
repeat_iter_external
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | repeat_iter_external(A) -> | exactly 1 | Streaming |
Same as repeat_iter but treats the iter as external, meaning this operator
will trigger the start of new ticks in order to repeat, causing spinning-like behavior.
sort
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> sort() -> | exactly 1 | Blocking |
Takes a stream as input and produces a sorted version of the stream as output.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print 1, 2, 3 (in order) source_iter(vec![2, 3, 1]) -> sort() -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
sort can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. The default is 'tick. With 'tick only
the values will only be collected within a single tick will be sorted and emitted. With
'static, values will be remembered across ticks and will be repeatedly emitted each tick (in
order).
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<usize>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> sort::<'static>() -> for_each(|n| println!("{}", n)); }; input_send.send(6).unwrap(); input_send.send(3).unwrap(); input_send.send(4).unwrap(); flow.run_available(); // 3, 4, 6 input_send.send(1).unwrap(); input_send.send(7).unwrap(); flow.run_available(); // 1, 3, 4, 6, 7 }
sort_by
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> sort_by(A) -> | exactly 1 | Blocking |
Takes a stream as input and produces a version of the stream as output sorted according to the key extracted by the closure.
Note: The closure has access to the
contextobject.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print (1, 'z'), (2, 'y'), (3, 'x') (in order) source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')]) -> sort_by(|(k, _v)| k) -> for_each(|x| println!("{:?}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
source_file
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_file(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Reads the referenced file one line at a time. The line will NOT include the line ending.
Will panic if the file could not be read, or if the file contains bytes that are not valid UTF-8.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_file("Cargo.toml") -> for_each(|line| println!("{}", line)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
source_interval
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_interval(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: A
Durationfor this interval.
Emits Tokio time Instants on a
repeated interval. The first tick completes imediately. Missed ticks will be scheduled as soon
as possible, and the Instant will be the missed time, not the late time.
Note that this requires the hydroflow instance be run within a Tokio Runtime.
The easiest way to do this is with a #[tokio::main]
annotation on async fn main() { ... }.
use std::time::Duration; use hydroflow::hydroflow_syntax; #[tokio::main] async fn main() { let mut hf = hydroflow_syntax! { source_interval(Duration::from_secs(1)) -> for_each(|time| println!("This runs every second: {:?}", time)); }; // Will print 4 times (fencepost counting). tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async()) .await .expect_err("Expected time out"); // Example output: // This runs every second: Instant { t: 27471.704813s } // This runs every second: Instant { t: 27472.704813s } // This runs every second: Instant { t: 27473.704813s } // This runs every second: Instant { t: 27474.704813s } }
source_iter
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_iter(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: An iterable Rust object. Takes the iterable object and delivers its elements downstream one by one.
Note that all elements are emitted during the first tick.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_iter(vec!["Hello", "World"]) -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
source_json
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_json(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: An
AsRef<Path>for a file to read as json. This will emit the parsed value one time.
source_json may take one generic type argument, the type of the value to be parsed, which must implement Deserialize.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { source_json("example.json") -> for_each(|json: hydroflow::serde_json::Value| println!("{:#?}", json)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
source_stdin
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stdin() -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: port number
source_stdin receives a Stream of lines from stdin
and emits each of the elements it receives downstream.
#![allow(unused)] fn main() { let mut flow = hydroflow::hydroflow_syntax! { source_stdin() -> map(|x| x.unwrap().to_uppercase()) -> for_each(|x| println!("{}", x)); }; flow.run_async(); }
source_stream
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stream(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: The receive end of a tokio channel
Given a Stream
created in Rust code, source_stream
is passed the receive endpoint of the channel and emits each of the
elements it receives downstream.
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<&str>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x)); }; input_send.send("Hello").unwrap(); input_send.send("World").unwrap(); flow.run_available(); }
source_stream_serde
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 0 | source_stream_serde(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments:
Stream
Given a Stream
of (serialized payload, addr) pairs, deserializes the payload and emits each of the
elements it receives downstream.
#![allow(unused)] fn main() { async fn serde_in() { let addr = hydroflow::util::ipv4_resolve("localhost:9000".into()).unwrap(); let (outbound, inbound, _) = hydroflow::util::bind_udp_bytes(addr).await; let mut flow = hydroflow::hydroflow_syntax! { source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(x, a): (String, std::net::SocketAddr)| x.to_uppercase()) -> for_each(|x| println!("{}", x)); }; flow.run_available(); } }
tee
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> tee()[<output_port>] -> | at least 2 | Streaming |
1 input stream, n output streams
Takes the input stream and delivers a copy of each item to each output.
Note: Downstream operators may need explicit type annotations.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { my_tee = source_iter(vec!["Hello", "World"]) -> tee(); my_tee -> map(|x: &str| x.to_uppercase()) -> for_each(|x| println!("{}", x)); my_tee -> map(|x: &str| x.to_lowercase()) -> for_each(|x| println!("{}", x)); my_tee -> for_each(|x: &str| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
unique
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> unique() -> | exactly 1 | Streaming |
Takes one stream as input and filters out any duplicate occurrences. The output contains all unique values from the input.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { // should print 1, 2, 3 (in any order) source_iter(vec![1, 1, 2, 3, 2, 1, 3]) -> unique() -> for_each(|x| println!("{}", x)); }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }
unique can also be provided with one generic lifetime persistence argument, either
'tick or 'static, to specify how data persists. The default is 'static.
With 'tick, uniqueness is only considered within the current tick, so across multiple ticks
duplicate values may be emitted.
With 'static, values will be remembered across ticks and no duplicates will ever be emitted.
#![allow(unused)] fn main() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<usize>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> unique::<'tick>() -> for_each(|n| println!("{}", n)); }; input_send.send(3).unwrap(); input_send.send(3).unwrap(); input_send.send(4).unwrap(); input_send.send(3).unwrap(); flow.run_available(); // 3, 4 input_send.send(3).unwrap(); input_send.send(5).unwrap(); flow.run_available(); // 3, 5 // Note: 3 is emitted again. }
unzip
| Inputs | Syntax | Outputs | Flow |
|---|---|---|---|
| exactly 1 | -> unzip()[<output_port>] -> | exactly 2 | Streaming |
Output port names:
0,1
1 input stream of pair tuples
(A, B), 2 output streams
Takes the input stream of pairs and unzips each one, delivers each item to its corresponding side.
#![allow(unused)] fn main() { #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); __rt.block_on(async { let mut __hf = hydroflow::hydroflow_syntax! { my_unzip = source_iter(vec![("Hello", "Foo"), ("World", "Bar")]) -> unzip(); my_unzip[0] -> for_each(|x| println!("0: {}", x)); // Hello World my_unzip[1] -> for_each(|x| println!("1: {}", x)); // Foo Bar }; for _ in 0..100 { hydroflow::tokio::task::yield_now().await; if !__hf.run_tick() { // No work done. break; } } }) }