Pipeline Syntax
How Types Flow
Every pipeline starts with PipelineFactory.Start<TStart>(). The TStart type is invariant – it never changes throughout composition. Each step receives the previous step’s output (TOutput) and produces a new output (TNext). See Conventions for the full type parameter naming guide.
PipelineFactory.Start<string>() // TStart=string, TOutput=string
.Pipe((ctx, arg) => int.Parse(arg)) // TStart=string, TOutput=int (Pipe transforms type)
.Call((ctx, arg) => Log(arg)) // TStart=string, TOutput=int (Call preserves type)
.Pipe((ctx, arg) => arg.ToString()) // TStart=string, TOutput=string (Pipe transforms type)
.Build(); // -> FunctionAsync<string, string>
Statements
| Method | Description | Type Effect |
|---|---|---|
| Call | Execute a void statement that does not transform the pipeline output. | Preserves TOutput |
| CallAsync | Asynchronously execute a void statement that does not transform the pipeline output. | Preserves TOutput |
| Pipe | Execute a statement that transforms the pipeline output. | TOutput -> TNext |
| PipeAsync | Asynchronously execute a statement that transforms the pipeline output. | TOutput -> TNext |
Flow Control
| Method | Description | Type Effect |
|---|---|---|
| Pipe | Pipes a child pipeline with optional middlewares. | TOutput -> TNext |
| PipeIf | Conditionally pipes a child pipeline with optional middlewares. | TOutput -> TNext |
| Call | Calls a child pipeline with optional middlewares. | Preserves TOutput |
| CallIf | Conditionally calls a child pipeline with optional middlewares. | Preserves TOutput |
| ForEach | Enumerates a collection pipeline input. | Preserves TOutput |
| Reduce | Transforms an enumerable pipeline input. | IEnumerable<TElement> -> TNext |
| WaitAll | Waits for concurrent pipelines to complete. | TOutput -> TNext (via reducer) |
Cancellation
| Method | Description |
|---|---|
| Cancel | Cancels the pipeline after the current step. |
| CancelWith | Cancels the pipeline with a value, after the current step. |
Reference
Statements
Call- Execute a statement that does not transform the pipeline output.CallAsync- Asynchronously execute a statement that does not transform the pipeline output.Pipe- Execute a statement that transforms the pipeline output.PipeAsync- Asynchronously execute a statement that transforms the pipeline output.
In this example notice that arg + 9 is not returned from the use of Call. The Call block executes its inner pipeline but discards its result – the outer pipeline’s TOutput is preserved.
var callResult = string.Empty;
var command = PipelineFactory
.Start<string>() // TStart=string, TOutput=string
.Pipe( ( ctx, arg ) => arg + "1" ) // TOutput=string (string->string)
.Pipe( ( ctx, arg ) => arg + "2" ) // TOutput=string
.Call( builder => builder // Call: TOutput stays string
.Call( ( ctx, arg ) => callResult = arg + "3" ) // inner side-effect
.Pipe( ( ctx, arg ) => arg + "9" ) // inner result discarded
)
.Pipe( ( ctx, arg ) => arg + "4" ) // TOutput=string (continues from "12")
.Build();
var result = await command( new PipelineContext() );
Assert.AreEqual( "124", result );
Assert.AreEqual( "123", callResult );
If Conditions
PipeIf and PipeIfAsync allow you to conditionally add a step to the pipeline. You can specify a condition function that determines whether the step should be added, a builder function that creates the step, and an optional flag indicating whether middleware should be inherited.### CallIf
CallIf and CallIfAsync allow you to conditionally call a child pipeline with optional middlewares. You can specify a condition function that determines whether the child pipeline should be called, a builder function that creates the child pipeline, and an optional flag indicating whether middleware should be inherited.### ForEach
// Takes a string and returns a number
var question = PipelineFactory
.Start<string>()
.PipeIf((ctx, arg) => arg == "Adams", builder => builder
.Pipe((ctx, arg) => 42)
.Cancel()
)
.Pipe((ctx, arg) => 0)
.Build();
var answer1 = await question(new PipelineContext(), "Adams");
Assert.AreEqual(42, answer1);
var answer2 = await question(new PipelineContext(), "Smith");
Assert.AreEqual(0, answer2);
ForEach
ForEach and ForEachAsync allow you to enumerate a collection pipeline input and apply a pipeline to each element. The ForEach preserves the pipeline’s TOutput – the inner pipeline processes each TElement for its side effects.
var count = 0;
var command = PipelineFactory
.Start<string>() // TStart=string, TOutput=string
.Pipe( ( ctx, arg ) => arg.Split( ' ' ) ) // TOutput=string[] (string->string[])
.ForEach().Type<string>( builder => builder // TElement=string, TOutput stays string[]
.Pipe( ( ctx, arg ) => count += 10 ) // inner pipeline processes each element
)
.Pipe( ( ctx, arg ) => count += 5 ) // TOutput=int (string[]->int via assignment)
.Build();
await command( new PipelineContext(), "e f" );
Assert.AreEqual( count, 25 );
Reduce
Reduce and ReduceAsync allow you to transform an enumerable pipeline input to a single value. You can specify a reducer function that defines how the elements should be combined, and a builder function that creates the pipeline for processing the elements. The .Type<TElement, TNext>() call specifies the element type and the reduced output type.
var command = PipelineFactory
.Start<string>() // TStart=string, TOutput=string
.Pipe( ( ctx, arg ) => arg.Split( ' ' ) ) // TOutput=string[]
.Reduce().Type<string, int>( ( aggregate, value ) => aggregate + value, builder => builder // TElement=string, TNext=int
.Pipe( ( ctx, arg ) => int.Parse( arg ) + 10 ) // each element: string->int
)
.Pipe( ( ctx, arg ) => arg + 5 ) // TOutput=int (int->int)
.Build();
var result = await command( new PipelineContext(), "1 2 3 4 5" );
Assert.AreEqual( result, 70 );
WaitAll
WaitAll allows you to wait for concurrent pipelines to complete before continuing. You can specify a set of builders that create the pipelines to wait for, and a reducer function that combines the results of the pipelines.
var count = 0;
var command = PipelineFactory
.Start<int>()
.WaitAll( builders => builders.Create(
builder => builder.Pipe( ( ctx, arg ) => Interlocked.Increment( ref count ) ),
builder => builder.Pipe( ( ctx, arg ) => Interlocked.Increment( ref count ) )
),
reducer: ( ctx, arg, results ) => { return arg + results.Sum( x => (int) x.Result ); }
)
.Build();
var result = await command( new PipelineContext() );
Assert.AreEqual( 2, count );
Assert.AreEqual( 3, result );
Cancellation
Cancel method allows you to cancel the pipeline after the current step.
CancelWith method allows you to cancel the pipeline with a value after the current step.