-module(workers). -export([start/5, worker/1, pmap/2]). -record(st, { tasks, aworkers, pworkers, get, combine } ). worker(Compute) -> spawn (fun () -> worker_body(Compute) end ). worker_body(Compute) -> receive {Pid, Tasks} -> Result = Compute(Tasks), timer:sleep(random:uniform(800)), Pid ! {self(), Result}, worker_body(Compute) end. %% Done work_load(#st{tasks = [], aworkers = []}, Results) -> Results; %% There are tasks to give to a passive worker work_load(St = #st{tasks = [_Task | _Tasks], pworkers = [PWorker | PWorkers], aworkers = AWorkers, get = Get} , Results) -> {Chunk, TTasks} = Get(St#st.tasks), PWorker ! {self(), Chunk}, work_load(St#st{tasks = TTasks, pworkers = PWorkers, aworkers = [PWorker | AWorkers] } , Results) ; %% No more passive workers or empty tasks, then %% wait for results work_load(St = #st{pworkers = PWorkers, aworkers = AWorkers, combine = Combine } , Results) -> receive {Worker, Result} -> work_load (St#st{ pworkers = [Worker | PWorkers], aworkers = lists:delete(Worker, AWorkers) } , Combine(Result, Results)) end. start(Tasks, Workers, Get, Combine, InitialResult) -> St = #st{tasks = Tasks, pworkers = Workers, aworkers = [], get = Get, combine = Combine}, work_load(St, InitialResult). %%% Example get_pmap([X | Xs]) -> {X, Xs}. combine_pmap(R, Rs) -> [R | Rs]. %%Two workers only pmap(F, Xs) -> W1 = worker(F), W2 = worker(F), W3 = worker(F), W4 = worker(F), W5 = worker(F), W6 = worker(F), start(Xs, [W1, W2, W3, W4, W5, W6], fun get_pmap/1, fun combine_pmap/2, []).