We have not used stream processors extensively in the examples presented so far, but plain stream processors are interesting for at least these reasons:
We use the following informal definitions:
Figure 26. A general stream processor and a stream processor with a single input stream and a single output stream.
The restriction may seem severe, but the chosen set of combinators allows streams to be merged and split, so a stream processor with many input/output streams can be represented as one with a single input stream and a single output stream. The advantage is that we can take a combinator-based approach to building networks of communicating stream processors. The combinators are discussed further in Chapter 17. Below we discuss how to write atomic stream processors, that is, stream processors that do not consist of several concurrently running stream processors. Their behaviour is defined by a linear sequence of I/O actions.
wheredata SP input output
outputare the types of the elements in the input and output streams, respectively (Figure 27). (The implementation of stream processors in a lazy functional language are discussed in Chapter 20.)
Figure 27. A stream processor of type
The library also provides the function
which can be used on the top level of a program built with stream processors (see Chapter 19). The functionrunSP :: SP i o -> [i] -> [o]
absFdiscussed in Chapter 12 can be used to combine stream processors with fudgets.
As an example of how to use these in recursive definitions of stream processors, consider the identity stream processorputSP :: output -> SP input output -> SP input output getSP :: (input -> SP input output) -> SP input output nullSP :: SP input output
the busy stream processor-- The identity stream processor idSP :: SP a a idSP = getSP $ \ x -> putSP x idSP
and the following stream-processor equivalents of the well known list functions:-- A stream processor that is forever busy computing. busySP :: SP a b busySP = busySP
The stream processormapSP :: (a -> b) -> SP a b mapSP f = getSP $ \ x -> putSP (f x) $ mapSP f filterSP :: (a -> Bool) -> SP a a filterSP p = getSP $ \ x -> if p x then putSP x $ filterSP p else filterSP p
nullSPneed actually not be considered as a primitive. It can be defined as
i.e., it is a stream processor that ignores all input and never produces any output. A practical advantage with an explicitly representednullSP = getSP $ \ x -> nullSP
nullSPis that it allows stream processors that terminate to be ``garbage collected''.
concatMapSP :: (i->[o]) -> SP i o.
putListSPthat outputs the elements of a list, one at a time:
AndputListSP :: [o] -> SP i o -> SP i o putListSP  = id putListSP (x:xs) = putSP x . putListSP xs
concatMapSP f = getSP $ \ x -> putListSP (f x) $ concatMapSP f
mapFilterSP :: (i->Maybe o) -> SP i o.
mapFilterSP f = getSP $ \ x -> case f x of Nothing -> mapFilterSP f Just y -> putSP y $ mapFilterSP f
sumSP, a stream processor that computes the accumulated sum of its input stream:
In this case, the internal state is a value of the typesumSP :: Int -> SP Int Int sumSP acc = getSP $ \ n -> putSP (acc+n) $ sumSP (acc+n)
Int, which also happens to be the type of the input and output streams. In general, the type of the input and output streams can be different from the type of the internal state, which can then be completely hidden.
The Fudget library provides two general functions for construction of stream processors with internal state:
(mapAccumlSP :: (s -> i -> (s, o)) -> s -> SP i o concatMapAccumlSP :: (s -> i -> (s, [o])) -> s -> SP i o
concatMapAccumlSPis also known as
mapstateSP.) The first argument to these functions is a state transition function which given the current state and an input message should produce a new state and an output message (zero or more outputs in the case of
mapAccumlSPwe can define
sumSPwithout using explicit recursion:
Representing state information as one or more accumulating arguments is useful when the behaviour of the stream processor is uniform with respect to the state. If a stream processor reacts differently to input depending on its current state, it can be more convenient to use a set of mutually recursive stream processors where each stream processor corresponds to a state in a finite state automaton. As a simple example, consider a stream processor that outputs every other element in its input stream:sumSP :: Int -> SP Int Int sumSP = mapAccumlSP (\ acc n -> (acc+n,acc+n))
It has two states: the ``pass on'' state, where the next input is passed on to the output; and the ``skip'' state, where the next input is skipped.passOnSP = getSP $ \ x -> putSP x $ skipSP skipSP = getSP $ \ x -> passOnSP
The two ways of representing state illustrated above, can of course be combined.
concatMapAccumlSP :: (s -> i -> (s, [o])) -> s -> SP i o concatMapAccumlSP f s0 = getSP $ \x -> let (s, ys) = f s0 x in putListSP ys $ concatMapAccumlSP f s mapAccumlSP :: (s -> i -> (s, o)) -> s -> SP i o mapAccumlSP f s0 = getSP $ \x -> let (s, y) = f s0 x in putSP y $ mapAccumlSP f s
The stream processorseqSP :: SP a b -> SP a b -> SP a b
sp1 `seqSP` sp2behaves like sp1 until sp1 becomes
nullSP, and then behaves like sp2. However, the same can also be achieved by making all procedures end with a call to a continuation stream processor instead of
seqSPdoes not add any new power.
We should also note that if this is to work properly, the
nullSP must be explicitly represented, and not
just defined as a stream processor that ignores all input and
never produces any output; contrary to what was suggested in
-- The type: type SPm input output answer -- Standard monad operations: unitSPm :: a -> SPm i o a bindSPm :: SPm i o a -> (a -> SPm i o b) -> SPm i o b -- Monadic versions of nullSP, putSP and getSP: nullSPm :: SPm i o () putSPm :: o -> SPm i o () getSPm :: SPm i o i -- A glue function: runSPm :: SPm i o () -> SP i o
Figure 28. The stream-processor monad.
runSPm you can use the combinators for
``plain'' stream processors to construct networks of
For writing complex stream processors, it is of course possible
to combine the stream-processor monad with other monads, e.g., a
state monad. The Fudget library defines the type
for stream processor-monads with state. A closer presentation
and an example of its use can be found as part of