So a stream with elements of type a can be represented as a list with elements of type a. A stream processor can be represented as function from the input stream to the output stream:
We call this the list-based representation. An obvious advantage with this approach is that the list type is a standard type and all operations provided for lists can be reused when defining stream processors.type Stream a = [a] type SP i o = Stream i -> Stream o
Another advantage with this representation is that it clearly shows the close relationship between functions and stream processors. For example, serial composition is simply function composition:
The basic stream processor actions also have very simple definitions:(-==-) :: SP m o -> SP i m -> SP i o sp1 -==- sp2 = sp1 . sp2
A problem with this representation however, is that parallel composition is impossible to implement. A reasonable definition would have to look something like this:nullSP = \ xs ->  x `putSP` sp = \ xs -> x : sp xs getSP isp = \ xs -> case xs of  ->  x:xs' -> isp x xs'
But what should ??? be replaced with, so that the first output from the composition is the first output to become available from one of the components? For example, suppose thatsp1 -*- sp2 = \ xs -> merge (sp1 xs) (sp2 xs) where merge ys zs = ???
that is, sp1 needs some input before it can produce some output, but sp2 can outputsp1 _|_ = _|_ sp2 _|_ = 1:_|_
1immediately. Then, the composition should immediately output
But(sp1 -*- sp2) _|_ = 1:_|_
(sp2 -*- sp1) _|_should also be
1:_|_, so ??? must be an expression that chooses the one of
zswhich happens to be non-bottom. This can clearly not be done in an ordinary purely functional language.
As a more concrete example, consider what should happen if we apply the stream processor
to [1, 2, 3, 4, ...]. If the input elements appear at a slower rate than they can be processed by eithermap (*100) -*- filter even
filter, the desired output stream would be something like
[100, 200, 2, 300, 4, 400, ...], i.e., in this particular case there should be two elements from the left stream processor for every element from the right stream processor.
The elements in the two output streams should be merged in the order they become computable as more elements of the input stream become available. However, there is no way of telling in a sequential language which of the two stream processors will be the first one to be able to produce an output. Is seems that the two streams need to be evaluated in parallel, and then elements must be chosen in the order they become available.
The most natural and general solution to this problem is to use parallel evaluation, and we will take a look at this next. But by changing the representation of stream processors it is possible to obtain solutions that work in an ordinary sequential language. We will look at these solutions in Section 20.4.
The operator suggested above is a variant of
McCarthy's ambivalent operator [McC63]. But a
programming language with such an operator is not purely
functional, and thus makes ordinary equational reasoning
unsound. Although such a language may still be useful
[Mor94], there are solutions that allow you to make
indeterministic choices in a purely functional way.
In the following section, we will introduce a variant of
amb which is purely functional.
We call our operator for indeterministic choice
Operationally, the expressionchoose :: Oracle -> a -> b -> Bool
choose o a bis evaluated by starting the evaluation of a and b in parallel and then returning
Trueif a reaches head normal form first, and
Falseif b does. Denotationally,
choose o a breturns
Falsedepending only on the value of the oracle o (which magically happen to have the ``right'' value). An oracle should only be used once, since it must always give the same answer. We therefore distribute an infinite tree of oracles to all stream processors, as an additional argument:
Using the oracle tree, we can now easily implement parallel composition of stream processors (see also Figure 42):data OracleTree = OracleNode OracleTree Oracle OracleTree type SP i o = OracleTree -> Stream i -> Stream o
In this implementation, the oracle tree is split into three: two subtrees are fed to the composed stream processors, and one is given to the functionsp1 -*- sp2 = \(OracleNode (OracleNode ot _ ot1) _ ot2) xs -> merge ot (sp1 ot1 xs) (sp2 ot2 xs) where merge (OracleNode ot o _) ys zs = if choose o ys zs then merge' ot ys zs else merge' ot zs ys merge' ot (y:ys) zs = y:merge ot ys zs merge' ot  zs = zs
merge, together with the output streams from the composed stream processors. The function
mergeextracts fresh oracles from the tree and uses
chooseto see which stream is first to reach head normal form, It then calls
merge', with the second argument being the stream which has been evaluated.
Figure 42. Parallel composition of stream processors using oracles.
If we impose the restriction that sp1 and sp2 must produce
output at the same rate, then
sp1 -*- sp2 can
be defined as:
However, it is awkward to impose such a constraint between the output streams of two different stream processors. Also, this solution does not work well for tagged parallel composition. A more useful constraint relates the input and output stream of a single stream processor.(sp1 -*- sp2) xs = merge (sp1 xs) (sp2 xs) where merge (y:ys) (z:zs) = y:z:merge ys zs
mapis an example that satisfies this constraint, whereas
filteris a function that does not.
With this restriction, tagged parallel composition can easily be implemented: the next element in the output stream should be taken from the stream processor that last received an element from the input stream. The following implementation of tagged parallel composition uses this fact by merging the output streams using a stream of synthetic oracles computed from the input stream (see also Figure 43):
(-+-) :: SP a1 b1 -> SP a2 b2 -> SP (Either a1 a2) (Either b1 b2) (sp1 -+- sp2) xs = merge os (sp1 xs1) (sp2 xs2) where xs1 = [ x | Left x <- xs ] xs2 = [ y | Right y <- xs ] -- os : a synthetic oracle stream os = map isLeft xs merge (True:os) (y:ys) zs = Left y:merge os ys zs merge (False:os) ys (z:zs) = Right z:merge os ys zs isLeft (Left _) = True isLeft (Right _) = False
Figure 43. Parallel composition of stream processors using synthetic oracles.
This solution has some practical problems, however. As it stands above, there is a potentially serious space-leak problem. Consider the evaluation of an expression like
Here, sp2 will never receive any input. This means that(sp1 -+- sp2) [Left n | n<-[1..]]
mergewill never need to evaluate the argument
(sp2 xs2), which holds a reference to the beginning of the input stream via
xs2. This would cause all input consumed by the composition to be retained in the heap. However, provided that pattern bindings are implemented properly [Spa93], this problem can be solved by computing
xs2with a single recursive definition that returns a pair of lists:
Another problem is that the 1-1 restriction is rather severe. What should a stream processor do if it does not want to put a value in the output stream after consuming an input (likesplit :: [Either a b] -> ([a],[b]) split  = (,) split (x:xs) = case x of Left x1 -> (x1:xs1,xs2) Right x2 -> (xs1,x2:xs2) where (xs1,xs2) = split xs
filter)? What if it wants to output more than one value? Obviously, if an implementation with this restriction is given to a programmer, he will invent various ways to get around it. It is better to provide a solution from the beginning.
One way to relieve the restriction is to change the representation of stream processors to
thus allowing a stream processor to output a list of values to put in the output stream for every element in the input stream. Unfortunately, with this representation the standard list functions, liketype SP a b = [a] -> [[b]]
filter, can no longer be used in such a direct way. For example, instead of
map fone must use
map (\x->[f x]). Serial composition is no longer just function composition, rather it is something more complicated and less efficient. Also, it is still possible to write stream processors that do not obey the 1-1 restriction, leading to errors that can not be detected by a compiler. Consequently, it is not a good idea to reveal this representation to the application programmer, but rather provide the stream-processor type as an abstract type. And while we are using an abstract type, we might as well use a better representation.
We call this the continuation-based representation of stream processors. The type has one constructor for each operation a stream processor can perform. The constructors have arguments that are part of the operations (the value to output indata SP i o = NullSP | PutSP o (SP i o) | GetSP (i -> SP i o)
PutSP), and arguments that determine how the stream processor continues after the operation has been performed.
The continuation-based representation avoids the problem with
parallel composition that we ran into when using the
list-based representation, since it makes the consumption of
the input stream observable. With list functions, a stream
processor is applied to the entire input stream once and for
all. The rate at which elements are consumed in this list is
not observable from the outside. With the continuation-based
representation, a stream processor must evaluate to
each time it wants to read a value from the input stream. This
is what we need to be able to merge the output stream in the
right order in the definition of parallel composition.
An implementation of broadcasting parallel composition is shown in Figure 44. The implementation of tagged parallel composition is analogous. Note that we arbitrarily choose to inspect the left argument sp1 first. This means that even if sp2 could compute and output a value much faster than sp1, it will not get the chance to do so.
NullSP -*- sp2 = sp2 sp1 -*- NullSP = sp1 PutSP o sp1' -*- sp2 = PutSP o (sp1' -*- sp2) sp1 -*- PutSP o sp2' = PutSP o (sp1 -*- sp2') GetSP xsp1 -*- GetSP xsp2 = GetSP (\i -> xsp1 i -*- xsp2 i)
Figure 44. Implementation of parallel composition with the continuation-based representation.
With the continuation-based representation, serial composition can be implemented as shown in Figure 45.
NullSP -==- sp2 = NullSP PutSP o sp1' -==- sp2 = PutSP o (sp1' -==- sp2) GetSP xsp1 -==- NullSP = NullSP GetSP xsp1 -==- PutSP m sp2' = xsp1 m -==- sp2' GetSP xsp1 -==- GetSP xsp2 = GetSP (\i -> GetSP xsp1 -==- xsp2 i)
Figure 45. Implementation of serial composition with the continuation-based representation.
A definition of the loop combinator
loopSP is shown in
loopSP sp = loopSP' empty sp where loopSP' q NullSP = NullSP loopSP' q (PutSP o sp') = PutSP o (loopSP' (enter q o) sp') loopSP' q (GetSP xsp) = case qremove q of Just (i,q') -> loopSP' q' (xsp i) Nothing -> GetSP (loopSP . xsp) -- Fifo queues data Queue empty :: Queue a enter :: Queue a -> a -> Queue a qremove :: Queue a -> Maybe (a,Queue a)
Figure 46. Implementation of
loopSPwith the continuation-based representation.
runSP :: SP a b -> [a] -> [b].
runSP sp xs = case sp of PutSP y sp' -> y : runSP sp' xs GetSP xsp -> case xs of x : xs' -> runSP (xsp x) xs'  ->  NullSP -> 
Using list functions works well for parallel implementations. Demand is propagated from the output to the input stream by the normal evaluation mechanism of the functional language. Since streams are represented as lists, the standard list functions can be used directly as stream processors.
For sequential implementations, we saw that representing stream processors as functions from streams to streams prevented us from implementing parallel composition. Here, the continuation-based representation seems more attractive, and is the one that we currently employ in the Fudget library. The continuation-based representation also allows stream processors to be detached, moved, and plugged in somewhere else in the program--something that is used in Chapter 25.
Since our implementation is based on a sequential programming language, we do not get true concurrency. As long as all stream processors quickly react to input to avoid blocking other stream processors in the program, this is acceptable in practice. The reactiveness property is not enforced by the compiler, however.
It would be nice to have a representation that works well for both parallel and sequential implementations. Is perhaps the continuation-based representation useful also for parallel implementations? Consider the composition
The first output from the composition should be either the first output from sp1 or the first output from sp2, whichever happens to be ready first. But the parallel composition must evaluate to either(sp1 -*- idSP) -==- sp2
PutSP ..., or
GetSP .... In the first case we have prematurely committed ourselves to taking the output from sp1 first. In the second case we will not be able to deliver the first output until after sp2 has delivered its first output. It is not clear to us how the desired behaviour should be achieved.