16 Stream processors

We have not used stream processors extensively in the examples presented so far, but plain stream processors are interesting for at least these reasons:

Viewed in a more general context, the stream processor can be seen as a simple but practical incarnation of the process concept, and has connections with process algebras such as CCS [Mil80]. An advantage with stream processors is that they admit a simple implementation within a purely functional language. We can define a set of combinators for building networks of stream processors, and the stream processors are first class values, which can be passed around as messages.

We use the following informal definitions:

These definitions allow stream processors to have many input and output streams, but in the following we will only consider stream processors with a single input stream and a single output stream (see Figure 26).

Figure 26. A general stream processor and a stream processor with a single input stream and a single output stream.

The restriction may seem severe, but the chosen set of combinators allows streams to be merged and split, so a stream processor with many input/output streams can be represented as one with a single input stream and a single output stream. The advantage is that we can take a combinator-based approach to building networks of communicating stream processors. The combinators are discussed further in Chapter 17. Below we discuss how to write atomic stream processors, that is, stream processors that do not consist of several concurrently running stream processors. Their behaviour is defined by a linear sequence of I/O actions.

16.1 The stream-processor type

The Fudget library provides an abstract type for stream processors,

data SP input output
where input and output are the types of the elements in the input and output streams, respectively (Figure 27). (The implementation of stream processors in a lazy functional language are discussed in Chapter 20.)

Figure 27. A stream processor of type SP i o.

The library also provides the function

runSP :: SP i o -> [i] -> [o]
which can be used on the top level of a program built with stream processors (see Chapter 19). The function absF discussed in Chapter 12 can be used to combine stream processors with fudgets.

16.2 Atomic stream processors in continuation style

The behaviour of an atomic stream processor is described by a sequential program. There are three basic actions a stream processor can take:The Fudget library provides the following continuation style operations for these actions:

putSP :: output -> SP input output -> SP input output
getSP :: (input -> SP input output) -> SP input output
nullSP :: SP input output
As an example of how to use these in recursive definitions of stream processors, consider the identity stream processor

-- The identity stream processor
idSP :: SP a a
idSP = getSP $ \ x -> putSP x idSP
the busy stream processor

-- A stream processor that is forever busy computing.
busySP :: SP a b
busySP = busySP
and the following stream-processor equivalents of the well known list functions:

mapSP :: (a -> b) -> SP a b
mapSP f = getSP $ \ x -> putSP (f x) $ mapSP f

filterSP :: (a -> Bool) -> SP a a
filterSP p = getSP $ \ x -> if p x
                            then putSP x $ filterSP p
                            else filterSP p
The stream processor nullSP need actually not be considered as a primitive. It can be defined as

nullSP = getSP $ \ x -> nullSP
i.e., it is a stream processor that ignores all input and never produces any output. A practical advantage with an explicitly represented nullSP is that it allows stream processors that terminate to be ``garbage collected''.
Implement concatMapSP :: (i->[o]) -> SP i o.
First we define putListSP that outputs the elements of a list, one at a time:

putListSP :: [o] -> SP i o -> SP i o
putListSP [] = id
putListSP (x:xs) = putSP x . putListSP xs
And concatMapSP itself:

concatMapSP f =
    getSP $ \ x ->
    putListSP (f x) $
    concatMapSP f
Implement mapFilterSP :: (i->Maybe o) -> SP i o.
mapFilterSP f =
        getSP $ \ x ->
        case f x of
          Nothing  -> mapFilterSP f
          Just y   -> putSP y $
                      mapFilterSP f

16.3 Stream processors with encapsulated state

A stream processor can maintain an internal state. In practice, this can be accomplished by using an accumulating argument in a recursively defined stream processor. As a concrete example, consider sumSP, a stream processor that computes the accumulated sum of its input stream:

sumSP :: Int -> SP Int Int
sumSP acc = getSP $ \ n -> putSP (acc+n) $ sumSP (acc+n)
In this case, the internal state is a value of the type Int, which also happens to be the type of the input and output streams. In general, the type of the input and output streams can be different from the type of the internal state, which can then be completely hidden.

The Fudget library provides two general functions for construction of stream processors with internal state:

mapAccumlSP        :: (s -> i -> (s, o)) -> s -> SP i o
concatMapAccumlSP  :: (s -> i -> (s, [o])) -> s -> SP i o
(concatMapAccumlSP is also known as mapstateSP.) The first argument to these functions is a state transition function which given the current state and an input message should produce a new state and an output message (zero or more outputs in the case of concatMapAccumlSP). Using mapAccumlSP we can define sumSP without using explicit recursion:

sumSP :: Int -> SP Int Int
sumSP = mapAccumlSP (\ acc n -> (acc+n,acc+n))
Representing state information as one or more accumulating arguments is useful when the behaviour of the stream processor is uniform with respect to the state. If a stream processor reacts differently to input depending on its current state, it can be more convenient to use a set of mutually recursive stream processors where each stream processor corresponds to a state in a finite state automaton. As a simple example, consider a stream processor that outputs every other element in its input stream:

passOnSP = getSP $ \ x -> putSP x $ skipSP
skipSP = getSP $ \ x -> passOnSP
It has two states: the ``pass on'' state, where the next input is passed on to the output; and the ``skip'' state, where the next input is skipped.

The two ways of representing state illustrated above, can of course be combined.

Implement mapAccumlSP and concatMapAccumlSP using putSP and getSP.
concatMapAccumlSP :: (s -> i -> (s, [o])) -> s -> SP i o
concatMapAccumlSP f s0 =
    getSP $ \x ->
    let (s, ys) = f s0 x
    in putListSP ys $
       concatMapAccumlSP f s

mapAccumlSP :: (s -> i -> (s, o)) -> s -> SP i o
mapAccumlSP f s0 =
    getSP $ \x ->
    let (s, y) = f s0 x
    in putSP y $
       mapAccumlSP f s

16.4 Sequential composition of stream processors

Unlike CCS style process algebras [Mil80]--where nontrivial sequential behaviours can be constructed only by prefixing an existing behaviour with an I/O operation--the stream processors can be combined sequentially:

seqSP :: SP a b -> SP a b -> SP a b
The stream processor sp1 `seqSP` sp2 behaves like sp1 until sp1 becomes nullSP, and then behaves like sp2. However, the same can also be achieved by making all procedures end with a call to a continuation stream processor instead of nullSP; so seqSP does not add any new power.

We should also note that if this is to work properly, the operation nullSP must be explicitly represented, and not just defined as a stream processor that ignores all input and never produces any output; contrary to what was suggested in Section 16.2.

16.5 Stream-processor monads

The presentation thus far suggests that atomic stream processors should be programmed in continuation style. This is often natural, but for complex stream processors it can be beneficial to use a monadic style instead [Wad92,Wad95]. The two styles are compatible. The operations of the stream processor monad are shown in Figure 28.
-- The type:
type SPm input output answer

-- Standard monad operations:
unitSPm :: a -> SPm i o a
bindSPm :: SPm i o a -> (a -> SPm i o b) -> SPm i o b

-- Monadic versions of nullSP, putSP and getSP:
nullSPm :: SPm i o ()
putSPm  :: o -> SPm i o ()
getSPm  :: SPm i o i

-- A glue function:
runSPm  :: SPm i o () -> SP i o

Figure 28. The stream-processor monad.

Thanks to runSPm you can use the combinators for ``plain'' stream processors to construct networks of stream-processor monads.

For writing complex stream processors, it is of course possible to combine the stream-processor monad with other monads, e.g., a state monad. The Fudget library defines the type SPms for stream processor-monads with state. A closer presentation and an example of its use can be found as part of Chapter 31.