Distributed Erlang and Map-Reduce

The goal of this lab is to make the naďve map-reduce implementation presented in the lecture, a little less naďve. Specifically, we will make it run on multiple Erlang nodes, balance the load between them, and begin to make the code fault-tolerant.

Erlang resources

You will probably need to consult the Erlang documentation during this exercise. You can find the complete documentation here: http://www.erlang.org/doc/. To find documentation of a particular module, use the list of modules here: http://www.erlang.org/doc/man_index.html. Note that the Windows installer also installs the documentation locally, so if you are using Windows then you can just open the documentation via a link in the Start menu.

Connecting multiple Erlang nodes

The first step is to set up a network of connected Erlang nodes to play with. This can be done in two ways:

Running multiple Erlang nodes on one machine

Start several terminal windows/Windows cmd windows, and in each one start a named Erlang shell. Do this using a command such as

erlsname foo

on Linux or the Mac, and

werlsname foo

on Windows. (The Windows version starts Erlang in its own window, with some useful menus). The prompt displayed by the Erlang shell will show you what each Erlang node you created is called. For example, on my machine the prompt is

(baz@JohnsTablet2012)1>

This tells me that the node I created is called baz@JohnsTablet2012 (an Erlang atom).

Running Erlang nodes on multiple machines

It’s more fun using several machines, if you have access to them. The procedure is the same as above, but first you must ensure that all machines use the same cookie. Edit the file .erlang.cookie in your home directory on each machine, and place the same Erlang atom in each one. Then start Erlang nodes as above; as long as the machines are on the same network, then they should be able to find each other.

Connecting the nodes together

Your Erlang nodes are not yet connected… calling nodes() on any of them will return the empty list. To connect them, call

net_adm:ping(NodeB).

on NodeA (two of your node names). The result should be pong, and calling nodes() afterwards on either node should show you the other. Connect all your nodes in this way. Note that because Erlang builds a complete network, then you need only connect each node to one other node yourself.

Help! It doesn’t work

·         On multiple machines, check that the cookie really is the same on all the nodes. Call erlang:get_cookie() on each node to make sure.

·         If NodeA can’t connect to NodeB, try connecting NodeB to NodeA. Sometimes that helps!

·         Perhaps one or more of your machines requires a login before the network connection can be established. In a Windows network, try visiting the Shared Folder on each machine from the others—this may prompt for a password, and once you give the password then Erlang will also be able to connect.

Remote Procedure Calls

We’ll start by making remote procedure calls to other nodes. We’ll call io:format, which is Erlang’s version of printf. Try

rpc:call(OtherNode,io,format,[“hello”]).

You will find “hello” printed on your own node! Erlang redirects the output of processes spawned on other nodes back to the original spawning node—so io:format really did run on the other node, but its output was returned to the first one. To force output on the node where io:format runs, we also supply an explicit destination for the output. Try

rpc:call(OtherNode,io,format,[user,”hello”,[]]).

(where the last argument is the list of values for escapes like ~p in the string… since “hello” contains no escapes, then we pass the empty list). Make sure that the output really does appear on the correct node.

Compiling and loading

Loading code on other nodes can actually be done more simply than I thought. Write a simple module containing this function:

-module(foo).

-compile(export_all).

foo() -> io:format(user,”hello”,[]).

Now you can compile this module in the shell via

c(foo).

and you can then load it onto all your nodes via the command

nl(foo).

Try using rpc:call to call foo:foo on each node, checking that the output appears on the correct node.

Naďve Map-Reduce

Below you will find the source code of three of the modules presented in the lecture on map-reduce: a very simple map-reduce implementation on one node (both sequential and parallel), and two clients—a web crawler and a page rank calculator. Compile these modules, and ensure that you can crawl a part of the web.

crawl(“http://www.cse.chalmers.se/”,3).

You will need to start Erlang’s http client first, using inets:start().

The page rank calculator uses the information collected by the web crawler, but it assumes that the output of the web crawler has been saved in a dets file—a file that contains a set of key-value pairs. You will need to use dets to do this lab. You can find the documentation here (http://www.erlang.org/doc/man/dets.html) –and there is quite a lot of it—but you will only need a few functions from this module.

·         dets:open_file—see code below for usage

·         dets:insert—which inserts a list of key-value pairs into the file

·         dets:lookup—which returns a list of all the key-value pairs with a given key

Save the results from the web crawler in a dets file called web.dat, and check that the page ranking algorithm works. Then copy web.dat onto all your nodes—this will enable you to distribute the page-rank computation across your network. You should collect 40-100MB of web data so that the page-ranking algorithm takes appreciable time to run.

Distributing Map-Reduce

Modify the parallel map-reduce implementation so that it spawns worker processes on all of your nodes. Measure the performance of the page-ranking algorithm with the original parallel version, and your new distributed version.

Load-balancing Map-Reduce

Of course it is not really sensible to spawn all the worker processes at the same time. Instead, we should start enough workers to keep all the nodes busy, then send each node new work as it completes its previous job. Write a worker pool function which, given a list of 0-ary functions, returns a list of their results, distributing the work across the connected nodes in this way. That is, semantically worker_pool(Funs) -> [Fun() || Fun <- Funs], but the implementation should make use of all the nodes in your network. A good approach is to start several worker processes on each node, each of which keeps requesting a new function to call, then calling it and returning its result to the master, until no more work remains to be done. Modify the map-reduce implementation again to make use of your worker pool in both the map and the reduce phases. Measure the performance of page ranking with your new distributed map-reduce… is it faster?

Fault-tolerant Map-Reduce

Enhance your worker-pool to monitor the state of the worker processes, so that if a worker should die, then its work is reassigned to a new worker. Test your fault tolerance by killing one of your Erlang nodes (not the master) while the page-ranking algorithm is running. It should complete, with the same results, despite the failure.

Hand ins

You should submit the code of the three versions of map-reduce described above, together with your performance measurements. Describe your set-up: were you running on one machine or several, how much web data were you searching? What conclusions would you draw from this exercise?

The deadline is midnight on Friday, 18th May.

More

A full map-reduce implementation does a lot more than this, of course. The next step would be to avoid sending all the data via the master—the results of each mapper should be sent directly to the right reducer… although this introduces a lot more complexity. Something to experiment with later, perhaps?

The Code

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% This is a very simple implementation of map-reduce, in both

%% sequential and parallel versions.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

-module(map_reduce).

-compile(export_all).

 

%% We begin with a simple sequential implementation, just to define

%% the semantics of map-reduce.

 

%% The input is a collection of key-value pairs. The map function maps

%% each key value pair to a list of key-value pairs. The reduce

%% function is then applied to each key and list of corresponding

%% values, and generates in turn a list of key-value pairs. These are

%% the result.

 

map_reduce_seq(Map,Reduce,Input) ->

    Mapped = [{K2,V2}

                || {K,V} <- Input,

                    {K2,V2} <- Map(K,V)],

    reduce_seq(Reduce,Mapped).

 

reduce_seq(Reduce,KVs) ->

    [KV || {K,Vs} <- group(lists:sort(KVs)),

             KV <- Reduce(K,Vs)].

 

group([]) ->

    [];

group([{K,V}|Rest]) ->

    group(K,[V],Rest).

 

group(K,Vs,[{K,V}|Rest]) ->

    group(K,[V|Vs],Rest);

group(K,Vs,Rest) ->

    [{K,lists:reverse(Vs)}|group(Rest)].

 

map_reduce_par(Map,M,Reduce,R,Input) ->

    Parent = self(),

    Splits = split_into(M,Input),

    Mappers =

          [spawn_mapper(Parent,Map,R,Split)

           || Split <- Splits],

    Mappeds =

          [receive {Pid,L} -> L end || Pid <- Mappers],

    Reducers =

          [spawn_reducer(Parent,Reduce,I,Mappeds)

           || I <- lists:seq(0,R-1)],

    Reduceds =

          [receive {Pid,L} -> L end || Pid <- Reducers],

    lists:sort(lists:flatten(Reduceds)).

 

spawn_mapper(Parent,Map,R,Split) ->

    spawn_link(fun() ->

                             Mapped = [{erlang:phash2(K2,R),{K2,V2}}

                                         || {K,V} <- Split,

                                            {K2,V2} <- Map(K,V)],

                             Parent ! {self(),group(lists:sort(Mapped))}

                   end).

 

split_into(N,L) ->

    split_into(N,L,length(L)).

 

split_into(1,L,_) ->

    [L];

split_into(N,L,Len) ->

    {Pre,Suf} = lists:split(Len div N,L),

    [Pre|split_into(N-1,Suf,Len-(Len div N))].

 

spawn_reducer(Parent,Reduce,I,Mappeds) ->

    Inputs = [KV

                || Mapped <- Mappeds,

                    {J,KVs} <- Mapped,

                    I==J,

                    KV <- KVs],

    spawn_link(fun() -> Parent ! {self(),reduce_seq(Reduce,Inputs)} end).

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% This implements a page rank algorithm using map-reduce

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

-module(page_rank).

-compile(export_all).

 

%% Use map_reduce to count word occurrences

 

map(Url,ok) ->

    [{Url,Body}] = dets:lookup(web,Url),

    Urls = web_crawler:find_urls(Url,Body),

    [{U,1} || U <- Urls].

 

reduce(Url,Ns) ->

    [{Url,lists:sum(Ns)}].

 

%% 188 seconds

page_rank() ->

    dets:open_file(web,[{file,"web.dat"}]),

    Urls = dets:foldl(fun({K,_},Keys)->[K|Keys] end,[],web),

    map_reduce:map_reduce_seq(fun map/2, fun reduce/2,

                                   [{Url,ok} || Url <- Urls]).

 

%% 86 seconds

page_rank_par() ->

    dets:open_file(web,[{file,"web.dat"}]),

    Urls = dets:foldl(fun({K,_},Keys)->[K|Keys] end,[],web),

    map_reduce:map_reduce_par(fun map/2, 32, fun reduce/2, 32,

                                   [{Url,ok} || Url <- Urls]).

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% This module defines a simple web crawler using map-reduce.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

-module(crawl).

-compile(export_all).

 

%% Crawl from a URL, following links to depth D.

%% Before calling this function, the inets service must

%% be started using inets:start().

crawl(Url,D) ->

    Pages = follow(D,[{Url,undefined}]),

    [{U,Body} || {U,Body} <- Pages,

                    Body /= undefined].

 

follow(0,KVs) ->

    KVs;

follow(D,KVs) ->

    follow(D-1,

             map_reduce:map_reduce_par(fun map/2, 20, fun reduce/2, 1, KVs)).

 

map(Url,undefined) ->

    Body = fetch_url(Url),

    [{Url,Body}] ++

          [{U,undefined} || U <- find_urls(Url,Body)];

map(Url,Body) ->

    [{Url,Body}].

 

reduce(Url,Bodies) ->

    case [B || B <- Bodies, B/=undefined] of

          [] ->

              [{Url,undefined}];

          [Body] ->

              [{Url,Body}]

    end.

 

fetch_url(Url) ->

    case httpc:request(Url) of

          {ok,{_,_Headers,Body}}  ->

              Body;

          _ ->

              ""

    end.

 

%% Find all the urls in an Html page with a given Url.

find_urls(Url,Html) ->

    Lower = string:to_lower(Html),

    %% Find all the complete URLs that occur anywhere in the page

    Absolute = case re:run(Lower,"http://.*?(?=\")",[global]) of

                      {match,Locs} ->

                          [lists:sublist(Html,Pos+1,Len)

                             || [{Pos,Len}] <- Locs];

                      _ ->

                          []

                 end,

    %% Find links to files in the same directory, which need to be

    %% turned into complete URLs.

    Relative = case re:run(Lower,"href *= *\"(?!http:).*?(?=\")",[global]) of

                      {match,RLocs} ->

                          [lists:sublist(Html,Pos+1,Len)

                             || [{Pos,Len}] <- RLocs];

                      _ ->

                          []

                 end,                        

    Absolute ++ [Url++"/"++

                        lists:dropwhile(

                          fun(Char)->Char==$/ end,

                          tl(lists:dropwhile(fun(Char)->Char/=$" end, R)))

                    || R <- Relative].