21 Fudgets as stream processors

Having an implementation of stream processors, we are ready to build fudget combinators and some simple fudgets, based on the stream processor combinators and operations. With Figure 4 in mind, we can view fudgets in two ways:
  1. As plain stream processors which can have I/O effects. This is the abstract view that is presented to the application programmer. With this view, the fudget combinators simply do the same things as the corresponding stream-processor combinators.
  2. As stream processors with explicit high-level and low-level streams. In this chapter, we will take this view in order to implement fudgets.
It should be noted that there are other ways of implementing fudgets. We give two examples suitable for a monadic I/O system in Section 21.5 (references to other work in implementing fudgets are given in Chapter 42).

The fudget implementation used in the library is highly influenced by the synchronised stream I/O system used in version 1.2 of Haskell [HPJWe92].

21.1 Synchronised stream I/O

Synchronised stream I/O can be seen as a variant of the Landin stream I/O in Figure 1, where characters in the output and input streams are replaced by request and response constructors. The program and the I/O system are synchronised, in that for each request that the program produces, one response is produced by the I/O system. Thus, the program and the I/O system can be seen as two parts in a special type of dialogue. The type of a synchronised stream program main is

type Dialogue = [Request] -> [Response]
main :: Dialogue
Each request constructor represents a specific effect, and is defined in the datatype Request in Haskell 1.2:

data Request =  ReadFile String
             |  WriteFile String String
             |  ReachChan Chan
             |  AppendChan Chan String
             ...
The constructor ReadFile f is a request for the I/O system to read the contents of the file with name f. WriteFile f s is a request for writing s to a file with name f. Standard input and output are instances of so called channels: reading the character stream from standard input is requested by ReadChan stdin, and AppendChan stdout s is a request to write s on standard output.

The type of the response that is generated, when a request is carried out, depends on the request constructor. All response types are put in the union type Response:

data Response =  Success
              |  Failure IOError
              |  Str String
              |  IntResp Int
              ...
If the I/O requests are successful, requests for output merely generate a Success response, whereas input requests generate a value tagged with Str, IntResp, or some other constructor depending on its type. If a request fails, an error value tagged Failure is generated.

A program that uses the synchronised stream I/O model can be viewed as an atomic, sequential stream processor, that explicitly uses lists for representing the streams.

21.2 The tagged low-level streams

When adapting the synchronised stream model to Fudgets, we do two modifications. Firstly, we discard the explicit representation of streams as lazy lists, and secondly, we tag the requests and responses, to allow more than one stream processor to do I/O in our program.

We define a fudget of type F hi ho to be a stream processor that can input messages of type hi or tagged responses, and outputs ho messages or tagged requests:

type F hi ho = SP (Message TResponse hi) (Message TRequest ho)

data Message low high = Low low | High high
We could have used the standard type Either, but we prefer using the equivalent type Message for clarity.

The low-level streams carry I/O requests and responses. Fudget combinators like >+< and >==< merge the requests streams from the two argument fudgets. But when a fudget outputs a request we must be able to send the corresponding response back to the same fudget. For this reason, messages in the low-level streams are tagged with pointers indicating which fudget they come from or should be sent to. Since a fudget program can be seen as a tree of fudgets, where the nodes are fudget combinators and the leaves are atomic fudgets, we have chosen to use paths that point to nodes in the tree structure:

type TResponse  = (Path,Response)
type TRequest   = (Path,Request)

type Path = [Turn]
data Turn = L | R     -- left or right
The messages output from atomic fudgets contain an empty path, []. The binary fudget combinators prepend an L or an R onto the path in output messages to indicate whether the message came from the left or the right subfudget. Combinators that combine more than two fudgets (such as listF) uses a binary encoding of subfudget positions. On the input side, the path is inspected to find out to which subfudget the message should be propagated.

As an example, consider the fudget

f = f1 >==< (f2 >+< f3)
When f2 wants to perform an I/O request r, it puts ([],r) in its low-level output stream. The >+< combinator will prepend an L to the path, since f2 is the left subfudget. so ([L],r) will appear in the low-level output of f2 >+< f3. Analogously, the >==< combinator will prepend an R, so the low-level output from f will contain ([R,L],r). When a response later appears in the input stream of f, it will be tagged with the same path, [R,L], which will cause the combinators to propagate it to f2.

As should be apparent, the length of the paths is determined directly by the nesting depth of fudget combinators in the program. For programs that are structured roughly as balanced trees of fudgets, the length of the paths thus grow logarithmically with the number of atomic fudgets in the program. Hence, the overhead of constructing and analysing the paths also grows logarithmically with the number of fudgets. In practice, the maximal path length we have observed varies from 4 for trivial programs (the "Hello, world!" program in Section 9.1), 16 for small programs (the calculator in Section 9.8) and 30 for large programs (the proof assistant Alfa in Chapter 33).

Constructing and analysing the paths of messages is not the only source of overhead in the low-level message passing. Some fudget combinators, most notably the filters (see Chapter 24) treat some commands or events specially. They thus need to inspect all messages that pass through them. When a large number of messages have to be sent, the overhead may become too high. In Section 27.5.3 we present a situation in which we encountered this problem, and give a solution.

An implementation of tagged parallel composition of fudgets is shown in Figure 47. We have reused tagged parallel composition of stream processors by adding the appropriate tag adjusting pre and post processors. The other fudget combinators can be implemented using similar techniques.

>+< :: F i1 o1 -> F i2 o2 -> F (Either i1 i2) (Either o1 o2)
f1 >+< f2 = mapSP post -==- (f1 -+- f2) -==- mapSP pre
  where
    post msg =
      case msg of
        Left (High ho1)         -> High (Left ho1)
        Right (High ho2)        -> High (Right ho2)
        Left (Low (path,req))   -> Low (L:path,req)
        Right (Low (path,req))  -> Low (R:path,req)
    pre msg =
      case msg of
        High (Left hi1)    -> Left (High hi1)
        High (Right hi2)   -> Right (High hi2)
        Low (L:path,resp)  -> Left (Low (path,resp))
        Low (R:path,resp)  -> Right (Low (path,resp))

Figure 47. Tagged parallel composition of fudgets.

When a request reaches the top level of a fudget program, the path should be detached before the request is output to the I/O system and then attached to the response before it is sent back into the fudget hierarchy. This is taken care of in fudlogue. A simple version of fudlogue is shown in Figure 48.

fudlogue :: F a b -> Dialogue
fudlogue mainF = runSP (loopThroughRightSP routeSP (lowSP mainF))

routeSP =
    getLeftSP $ \ (path,request) ->
    putSP (Right request) $
    getRightSP $ \ response ->
    putSP (Left (path,response)) $
    routeSP

lowSP :: SP (Message li hi) (Message lo ho) -> SP li lo 
lowSP fud = filterLowSP -==- fud -==- mapSP Low

filterLowSP = mapFilterSP stripLow

stripLow (Low  low) = Just low
stripLow (High _  ) = Nothing

Figure 48. A simple version of fudlogue for synchronised stream I/O. It does not handle asynchronous input.

This version will suffice for programs where individual fudgets do not block in their I/O requests. If we want to react on input from many sources which comes in an unknown order (e.g. sockets, standard input, the window system, timeout events), this implementation will not be enough, so what should we do? We will discuss this more in Section 22.2. The short answer is that we can detect when the main fudget has become idle (that is, it has evaluated to getSP, and does not wait for a synchronous response). At this point, we perform a system call (namely select) to wait for input to happen on any of our sources of events.

21.3 Writing synchronous atomic fudgets

With the fudget representation in Section 21.2, an atomic fudget which repeatedly accepts a Haskell I/O request, performs it and outputs the response, can be implemented as follows. The combinators getHighSP and getLowSP waits for high- and low-level messages, respectively. They are defined in terms of waitForSP (Section 18.1).

requestF :: F Request Response
requestF = getHighSP $ \ req ->
           putSP (Low ([],req)) $
           getLowSP $ \(_,response) ->
           putSP (High response) $
           requestF
Some requests should be avoided, since when we evaluate their responses, the program might block. For example, we should not use ReadChan stdin, because its response is a lazy list representing the character streams from the standard input.

Files are usually OK to read, a fudget like readFileF (Section 14.2) can be implemented as follows:

readFileF :: F String (Either IOError String)
readFileF = post >^=< requestF >=^< ReadFile
where post (Str s) = Right s
      post (Failure f) = Left f
On its input, it waits for file names to open. The output is either an error value or the content of the file.

21.4 Fudget kernels

The fudget requestF in the previous section provides an interface to the Haskell stream I/O system. To program a fudget with a particular sequential I/O behaviour, a combinator like

-- Preliminary version
streamIoF :: SP (Either Response i) (Either Request o) -> F i o
streamIoF sp = loopThroughRightF (absF sp) requestF
could be used. The argument stream processor sp can talk to requestF using messages tagged Left and to other fudgets through messages tagged Right. However, it seems more appropriate to tag messages with the type Message introduced above, and let the type of streamIoF be

-- Final version
streamIoF :: K i o -> F i o
where

type K i o = SP (Message Response i) (Message Request o)
We call stream processors of type K i o fudget kernels. Fudget kernels are thus used when defining new atomic fudgets with particular I/O behaviours. We define some combinators for describing I/O behaviours in continuation style:

putHighK :: o -> K i o -> K i o
getHighK :: (i -> K i o) -> K i o
nullK :: K i o
doStreamIOK :: Request -> (Response -> K i o) -> K i o
The first three operations correspond directly to the stream-processor combinators putSP, getSP and nullSP, so fudget kernels can be seen as plain stream processors with access to the I/O system.

The implementations of the combinators introduced in this section are shown in Figure 49.

type K i o = SP (Message Response i) (Message Request o)

streamIoF :: K i o -> F i o
streamIoF kernel = mapSP post -==- kernel -==- mapSP pre
  where
    pre   (High i)        = High i
    pre   (Low (_,resp))  = Low resp
    post  (High o)        = High o
    post  (Low req)       = Low ([],req)

putHighK :: o -> K i o -> K i o
putHighK = putSP . High

getHighK :: (i -> K i o) -> K i o
getHighK = waitForSP high
  where
    high (High i) = Just i
    high _ = Nothing

nullK :: K i o
nullK = nullSP

doStreamIOK :: Request -> (Response -> K i o) -> K i o
doStreamIOK request contK =
    putSP (Low request) $
    waitForSP low contK
  where
    low (Low resp) = Just resp
    low _ = Nothing

Figure 49. Fudget kernel combinators.

21.5 Alternative implementations using monadic I/O

Today, Haskell uses the monadic I/O model, which is briefly explained in Section 41.1.3. A monadic version of fudlogue can be defined as follows:
fudIO1 :: F a b -> IO ()
fudIO1 f = case f of
    NullSP                     -> return ()
    GetSP _                    -> return ()
    PutSP (High _) f'          -> fudIO1 f'
    PutSP (Low (path,req)) f'  -> 
           do resp <- doRequest req
              fudIO1 (startupSP [Low (path,resp)] f')
This version still uses the stream I/O constructors internally to represent effects. It relies on an auxiliary function

doRequest :: Request -> IO Response
that converts requests to corresponding monadic effects.

We can also go one step further by throwing out the request and response datatypes, and use the IO monad directly to represent effects. This can be implemented by adding a constructor to the continuation-based stream-processor type:

data  F' i o =  PutF o (F' i o)
             |  GetF (i -> F' i o)
             |  NullF
             |  DoIoF (IO (F' i o))
The constructor DoIoF is used to express I/O effects. This constructor does not have any explicit argument for the continuation fudget, which instead is returned from the I/O computation. To connect fudgets to the I/O system, we use fudIO2:

fudIO2 :: F' i o -> IO ()
fudIO2 f = case f of
             NullF      -> return ()
             GetF _     -> return ()
             PutF _ f'  -> fudIO2 f'
             DoIoF io   -> io >>= fudIO2
We can provide an operation doIoF for plugging in monadic I/O operations in a fudget:

doIoF :: IO a -> (a -> F' i o) -> F' i o
doIoF io c = DoIoF (map c io)
The fudget combinators are defined just as the corresponding for stream processors, with extra cases for the DoIoF constructor. For example, in the case of parallel composition, these are:

DoIoF io  >*< g         = DoIoF (map (>*< g) io)
f         >*< DoIoF io  = DoIoF (map (f >*<) io)