JAF  PL

Pipeline APIs

Pipelines are constructed using a straightfoward JVM API. The examples below are in Scala, but I believe they’ll work from Java as well. And probably any other JVM language.

All of the examples that follow are available in the com.jafpldemo.examples package at https://github.com/ndw/jafpl-demo/.

The simplest possible pipeline is one that does nothing, NOP pipeline.

Example 1. NOP pipeline
  val graph  = new Graph()                               

  val pipeline = graph.addPipeline("pipeline")           
  val nop      = pipeline.addAtomic(new NOP(), "nop")    

  val config = new PrimitiveRuntimeConfiguration()       
  val runtime = new GraphRuntime(graph, config)          

  runtime.run()                                          

Let’s look at this in more detail.

 

A Graph is the representation of a JAFPL pipeline.

 

A pipeline is the top-level container in a graph.

 

Atomic steps have two parts: an implementation and an optional label. The label is used in graph output, if you request it, and in error messages. It’s a good idea to provide unique labels for each of your steps.

The NOP step is an atomic step that does nothing. It has no inputs, it has no outputs. Step implementations are independent of the JAFPL pipeline library. See the steps API.

 

Actually running a pipeline requires a runtime configuration. That’s where things like an expression language and runtime trace events are implemented. The PrimitiveRuntimeConfiguration, as it’s name implies, is a primitive stub implementation. It does the bare minimum required.

 

A GraphRuntime can actually evaluate the pipeline.

 

And run runs it, unsurprisingly.

This pipeline does nothing, so it’s not very interesting. Pipelines are only interesting when data flows through them. In order to have data flow, we either need to connect our pipeline to some external data source, or we have to introduce a step that produces some data. Introducing a step that produces data is simpler, in the short term, than plumbing an external data source, so let’s do that.

The Producer step is constructed with a list of items. When it runs, it produces those items on it’s “result” port. (Port names are arbitrary, but as a matter of practicality, the pipeline and the step implementations have to agree on the names.)

Producer pipeline is a pipeline that uses the producer step.

Example 2. Producer pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline("pipeline")
  val producer = pipeline.addAtomic(new Producer(List("Hello, world")), "producer")

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)

  runtime.run()

Curiously, running this pipeline doesn’t do anything either. Well. It does, but because the output of the producer step isn’t connected to anything, the pipeline just drops the output on the floor.

Data flow involves both producers and consumers. Let’s introduce a consumer step. The Consumer will read an arbitrary number of items on it’s input “source” port. When it runs, it will print them on the console. It produces no output. The Producer/consumer pipeline pipeline uses both the producer and the consumer.

Example 3. Producer/consumer pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline("pipeline")
  val producer = pipeline.addAtomic(new Producer(List("Hello, world")), "producer")
  val consumer = pipeline.addAtomic(new Consumer(), "consumer")

  graph.addEdge(producer, "result", consumer, "source")  

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)

  runtime.run()

Our pipeline now has two steps, but that all by itself wouldn’t be enough. We also need to connect them together to describe the data flow. (The library provides no implicit connections between steps.) The addEdge call at establishes a directed edge from the “result” port of producer to the “source” port of consumer.

The graph library doesn’t care about port names, but step implementations do. In order for this pipeline to work as described, the producer step must be written to send output to a port named “result” and the consumer step must be written to read input from a port named “source”.

If you run this pipeline, you’ll finally get some output: the canonical “Hello, world” message.

The Identity step simply copies its input to its output, so it’s always safe to add them to a flow. Let’s make our pipeline a few steps longer.

Example 4. Longer producer/consumer pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline("pipeline")
  val producer = pipeline.addAtomic(new Producer(List("Hello, world")), "producer")
  val a        = pipeline.addAtomic(new Identity(), "a") 
  val b        = pipeline.addAtomic(new Identity(), "b") 
  val consumer = pipeline.addAtomic(new Consumer(), "consumer")

  graph.addEdge(producer, "result", b,        "source") 
  graph.addEdge(b,        "result", a,        "source") 
  graph.addEdge(a,        "result", consumer, "source")

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)
  runtime.run()

The important point in this example is that although I added “a” then “b” to the pipeline; the edges connect the other way around. The order in which siblings steps run is determined entirely and exclusively by the ways in which they are connected.

In this pipeline, “producer” runs first, then “b” , then “a” , then “consumer”.

Pipelines that do nothing more than copy their inputs around aren’t very interesting. Let’s do something transformative this time. The Uppercase step takes a single string input on its “source” port and returns only the words in the string, shifted to upper case, on its “result” port. (The semantics are a little odd, but we’ll come back to that.)

Example 5. Uppercase pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline("pipeline")
  val producer = pipeline.addAtomic(new Producer(List("Hello, world")), "producer")
  val upper    = pipeline.addAtomic(new Uppercase(), "uppercase")
  val consumer = pipeline.addAtomic(new Consumer(), "consumer")

  graph.addEdge(producer, "result", upper, "source")
  graph.addEdge(upper, "result", consumer, "source")

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)

  runtime.run()

If you run this pipeline, it produces “HELLO WORLD”.

What happens if you change the producer so that it actually produces a sequence? (Feel free to go try it out!)

The answer is: an error. The uppercase step does not accept a sequence, so the pipeline will generate an error as soon as the second string arrives on its source input.

The way to work around that is with a loop.

Example 6. Uppercase loop pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline("pipeline")
  val producer = pipeline.addAtomic(new Producer(List("Hello", "There", "World")), "producer")

  val loop     = pipeline.addForEach("loop")                      
  val upper    = loop.addAtomic(new Uppercase(), "uppercase")     

  val consumer = pipeline.addAtomic(new Consumer(), "consumer")

  graph.addEdge(producer, "result", loop, "source") 
  graph.addEdge(loop, "current", upper, "source")   
  graph.addEdge(upper, "result", loop, "result")    
  graph.addEdge(loop, "result", consumer, "source") 

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)

  runtime.run()
 

A “for each” loop container is created in the pipeline.

 

The upper step is added to the loop container, not directly to the pipeline.

 

The producer output is connected to the loop. A for-each loop has a single input port, source.

 

From inside the loop, reading from the loop’s current port reads the current input string in the loop. This is connected to the source for the upper step.

 

The upper output is connected to the loop result port.

 

The loop result is connected to the consumer.

If you run this pipeline, it produces “HELLO”, “THERE”, and “WORLD”.

As a final example, let’s look at how we could use the upper step to capitalize the words of a sentence without losing the punctuation. JAFPL includes a “viewport” step for this purpose. Viewports expose sub-parts of their input to a pipeline and then stitch the results of processing those sub-parts back into the original.

Example 7. Viewport pipeline
  val graph  = new Graph()

  val pipeline = graph.addPipeline()

  val producer = pipeline.addAtomic(new Producer(List("Hello there, world.")), "producer")
  val viewport = pipeline.addViewport(new StringComposer(), "viewport") 
  val upper    = viewport.addAtomic(new Uppercase(), "uppercase")
  val consumer = pipeline.addAtomic(new Consumer(), "consumer")

  graph.addEdge(producer, "result", viewport, "source")
  graph.addEdge(viewport, "source", upper, "source")
  graph.addEdge(upper, "result", viewport, "result")
  graph.addEdge(viewport, "result", pipeline, "result")
  graph.addEdge(pipeline, "result", consumer, "source")

  val config = new PrimitiveRuntimeConfiguration()
  val runtime = new GraphRuntime(graph, config)

  runtime.run()

The magic here is in the implementation of the StringComposer . The string composer extracts the words “Hello”, “there”, and “world” from its input string. Each of those words is processed by the subpipeline in the viewport (i.e, upper). Then the results are recomposed.

If you run this pipeline, it produces “HELLO THERE, WORLD.”.

The decomposition/recomposition process is independent of the pipeline engine. You can use it to process parts of strings, JSON keys, arrays, XML documents, columns in CSV files, whatever you can implement.