Friday, December 3, 2010

Graph algorithms: Pregel style using Phoebus and Erlang

Phoebus is an implementation of Pregel which I wrote primarily because I found the original paper really interesting and because I couldn't find any open source implementation of Pregel for me to experiment with Parallel/Distributed Graph algorithms. The Pregel model provides an efficient and elegant means to Scalably process Large graphs in a distributed manner.

One particularly interesting application of Graph algorithms is Path Finding. Let us consider a simple Path finding Problem:
Foobar is a massive archipelago with many thousands of islands. The inhabitants of Foobar move from island to island in named boats. Each Island allows only a certain set of boats to anchor on its shore. Given a source and destination island, lets construct an algorithm (using the Pregel model) to finds the shortest path from the source to the destination island. The input to our algorithm is a line separated list of islands and the Boats each island has access to. For eg:
...
I1<Tab>B3<Tab>B5<NewLine>
I3<Tab>B2<Tab>B6<Tab>B10<NewLine>
...


Of course, there are already many battle tested algorithms to discover shortest paths but they are all meant to be run on a single node. The quintessential Breadth First Search can also be implemented in a parallel fashion using MapReduce. This is an iterative algorithm with the algorithm being executed multiple times by iteratively firing a MapReduce job, waiting for it to finish and then firing the same job again. One particular disadvantage of using MapReduce for graph processing is that since the Mappers and Reducers are stateless, the entire graph has to be sent across the network to be reprocessed in the next iteration. The state-full nature of Pregel (and therefore Phoebus) tries to contain the I/O overhead by ensuring that since the same worker is responsible for the same set of vertices in the graph until the algorithm terminates, only the messages to non local workers need to be sent across the network.


Lets try arriving at a solution using the Pregel Model and subsequently.. see if it works by encoding it in Erlang and feeding it to Phoebus (Phoebus understand only Erlang currently).

Step 1 : Modeling the Graph..

The input provided to us is a list of islands along with the boats each island has access to. Given an island, we first need to know all the other islands that are directly connected to it via the boats it has access to (we could try to find that by manually searching the list.. but wheres the fun in that :))

Now lets briefly go over an aspect of Pregel's computational model. Vertices in a Graph loaded into Phoebus communicate to each other by sending messages. If a message is sent to a non-existing vertex, that vertex  will be created in the next step (the default value of a vertex is the vertex name itself) and the message will be sent to it.

Given this, let us examine a sample input line.. island 'I1' has access to boats 'B3' and 'B5'. If we model 'I1' as a Vertex and send a message containing the island name 'I1' to 'B3' and 'B5', Phoebus will ensure that in the next step of the algorithm, the nodes 'B3' and 'B5' will be created and the message from 'I1' (containing the string 'I1') will be sent to it. Now the interesting thing to note is.. ALL islands that can access 'B3' and 'B5' would have sent a message (with its corresponding names encoded) to it. So in the subsequent step, the vertex for the boat is created and when Phoebus runs the Pregel 'compute' function on the Vertex, It will pass to it all the messages it has received in the previous step.. which in our case will contain the names of ALL the islands that boat plies to.

Before we code it up.. let me just provide the specification of the compute function Phoebus accepts:
%% compute_fun(VertexInfo, Aggregate, InboundMessages) 
%%   VertexInfo      := {VertexName, VertexValue, Edges}
%%   VertexName      := string()
%%   VertexValue     := string()
%%   Edges           := [EdgeInfo]
%%   EdgeInfo        := {EdgeValue, TargetVertex}
%%   EdgeValue       := string()
%%   TargetVertex    := string()
%%   Aggregate       := string()
%%   InboundMessages := [string()]
%% Returns: 
%% {VertexInfo, OutboundMessages, Aggregate, VertexState}
%%   OutboundMessages := [Message]
%%   Message          := {TargetVertex, string()}
%%   VertexState      := hold | active

Now for the code:
%% Since the EdgeList is [] we know this is a new Vertex
%% Phoebus has created. Also for all new vertices, 
%% Phoebus ensures that the VertexValue = VertexName.
%% Inbound Messages will be of the form 
%%     "discover:IslandName"
%% The newly created 'Boat' Vertices are sent messages 
%% by all Islands that are serviced by the boat. The 
%% 'Boat' vertices will store the 'Islands' as target
%% vertices in its Edge list 
compute_fun({VName, VName, []}, Agg, InMsgs) ->
  NewEdges = 
    lists:map(
      fun(M) -> 
        [_, N] = 
          re:split(M, ":", [{return, list}]), 
        {"1", N} 
      end, InMsgs),
  NewVVal = "from_src=;from_dest=",
  {{VName, NewVVal, NewEdges}, [], Agg, hold};

%% We know this is an 'Island' Vertex since the EdgeList 
%% is non-empty. As discussed, the first step would be 
%% to send "discover:IslandName" messages to all the 
%% Target Vertices it has access to via its edgelist
compute_fun({VName, VName, EList}, Agg, _) ->
  OutMsgs = 
    [{TV, "discover:" ++ VName} || {_, TV} <- EList],
  NewVVal = "from_src=;from_dest=",
  {{VName, NewVVal, EList}, OutMsgs, Agg, active};

Notice that we initialize a NewValue for both the Island and Boat vertices "from_src=;from_dest=" will hold the shortest path that Vertex has been apprised of at any point in the algorithm. More on that next step.. Also note that the VertexState returned by the second function is 'active'. The next sections will make it clear as to why..

Step 2 : Deciding how the algorithm stops..

When implemented using the Pregel model, the algorithm takes advantage of the 'Aggregate' mechanism mentioned in the Pregel paper. At each step of the algorithm, the 'Compute' function that is executed on all the vertices of the Graph is passed the 'aggregate' value for that step. The compute function can choose to return a different aggregate value than the one it received. Before the next step starts, these values are sent to an 'aggregator' function which essentially folds over all the values and spits a single value which is used as the aggregate for the next step. Thus the aggregate acts as global state visible to all the vertices in the Graph. 'Aggregates' are used in our algorithm for two things
1) we use it to signal when the algorithm should terminate. If any of the 'Compute' functions returns "done" as an aggregate, the algorithm must be terminated..
2) to pass the source and destination Islands to the algorithm.

The aggregate function:
%% Basically short circuits when it sees a "done" flag
aggregate_fun("done", _) -> "done";
aggregate_fun(_, "done") -> "done";
%% Else the initial aggregate value is passed along
aggregate_fun(X, _) -> X.

Step 3 : The actual Algorithm ..

How Phoebus works is.. in each step of the algorithm, the Compute function is is executed on all vertices which are either in the 'active' state OR which have any messages sent to it. A single source shortest path using a simple BFS works like this:
* Initially all Vertices are initialized with an 'infinite shortest path' value.
* The User starts the algorithm by providing the Source/Root Vertex via the initial 'Aggregate' value.
* Since All vertices are initially active, the compute function is executed on all the vertices of the graph.
* The Vertex whose name = Source/Root Vertex will send a message containing its name to all Vertices connected to it via edges.
* In the next step, all Vertices that received a message from the source/root vertex will compare the shortest path IT is aware of with ALL the messages it has received. If it has found a shorter path then what is currently its vertex value, it will update its value and send the new shortest path it knows to all Vertices it is connected to via its edgelist.
* The algorithm terminates when there are no messages generated in a step OR if the algorithm has exceeded the max number of allowed steps specified by the user. At which point the value of each vertex will contain the shortest path to it from the source

What we need is a single-PAIR shortest path algorithm. The algorithm is basically a slightly modified BFS. A single instance of the algorithm will fire two simultaneous BFSs from both the Source and the Destination. The algorithm terminates when some node in  the middle has received a message from both the source and the destination.. at which point the node with vote to halt the algorithm by returning "done" as the aggregate value.

The compute function continues as follows:
%% If a Vertex receives "done" as Aggregate Value, STOP
compute_fun(CurrState, "done" = Agg, _) ->
  {CurrState, [], Agg, hold};

%% The initial Aggregate passed to the algorithm
%% is of the "form Source:Destiation"
%% If the Current Vertex = Source Vertex, send
%% "src_path=VertexName"
%% If the Current Vertex = Dest Vertex, send
%% "dest_path=VertexName"
%% Else.. do nothing
compute_fun({VName, VVal, EList}, Agg, []) ->
  [Src, Dest] = 
    re:split(Agg, ":", [{return, list}]),
  OutMsgs = 
    case (Src =:= VName) orelse (Dest =:= VName) of
      true ->
        case VName of
          Src ->
            [{TV, "src_path=" ++ VName} 
                      || {_, TV} <- EList];
          _ ->
            [{TV, "dest_path=" ++ VName} 
                      || {_, TV} <- EList]
        end;
      _ -> []
    end,
  {{VName, VVal, EList}, OutMsgs, Agg, hold};


compute_fun({VName, VVal, EList}, Agg, InMsgs) ->
  %% START 1: Parse current state and get Current 
  %%          shortest source % dest path Vertex 
  %%          is aware of
  [TempSrcPath, TempDestPath] = 
    re:split(VVal, ";", [{return, list}]),
  SplitFun = 
    fun(P) -> 
        [_, X] = re:split(P, "=", [{return, list}]),
        case X of
          [] -> "inf";
          _ -> re:split(X, ":", [{return, list}])
        end
    end,
  {CurrSrcPath, CurrDestPath} = 
     {SplitFun(TempSrcPath), SplitFun(TempDestPath)},
  %% END 1:  
  %% START 2: Update known shortest paths.. if it has 
  %%          received anything shorter 
  {UpdatedSrcPath, UpdatedDestPath, NumS, NumD} =
    lists:foldl(
      fun([$s, $r, $c, $_, $p, $a, $t, $h, $= | Path], 
          {SSPath, SDPath, NS, ND}) ->
          IPathSplit = 
            re:split(Path, ":", [{return, list}]),
          case (SSPath =:= "inf") orelse 
               (length(IPathSplit) < SSPath) of
            true -> 
              {IPathSplit, SDPath, NS + 1, ND};
            _ -> 
              {SSPath, SDPath, NS + 1, ND}
          end;
         ([$d, $e, $s, $t, $_, 
                           $p, $a, $t, $h, $= | Path], 
          {SSPath, SDPath, NS, ND}) ->
          IPathSplit = 
            re:split(Path, ":", [{return, list}]),
          case (SDPath =:= "inf") orelse 
               (length(IPathSplit) < SDPath) of
            true -> 
              {SSPath, IPathSplit, NS, ND + 1};
            _ -> 
              {SSPath, SDPath, NS, ND + 1}
          end
      end, {CurrSrcPath, CurrDestPath, 0, 0}, InMsgs),
  %% END 2:
  [Src, Dest] = 
    re:split(Agg, ":", [{return, list}]),
  CleanFun = 
    fun("inf") -> []; (P) -> string:join(P, ":") end,
  {USPStr, UDPStr} = 
    {CleanFun(UpdatedSrcPath), 
     CleanFun(UpdatedDestPath)},
  case (Src =:= VName) andalso (NumD > 0) of
    true ->
  %% Send "done" flag if Source Vertex received 
  %%"dest_path" msg 
      {{VName, "full_path=" ++ 
               VName ++ ":" ++ UDPStr, EList}, 
       [], "done", hold};
    _ ->
      case (Dest =:= VName) andalso (NumS > 0) of
        true ->
  %% Send "done" flag if Dest Vertex received 
  %% "src_path" msg
          {{VName, "full_path=" ++ 
                   USPStr ++ ":" ++ VName, EList}, 
           [], "done", hold};
        _ ->
          case (UDPStr =/= []) andalso (USPStr =/= []) of
            true ->
  %% Send "done" flag if Vertex received both 
  %% "src_path" and "dest_path" msgs
              {{VName, 
                "full_path=" ++ USPStr ++ ":" ++ 
                VName ++ ":" ++ UDPStr, EList}, 
               [], "done", hold};
            _ ->              
  %% START 3: Else relay updated msgs to Vertices 
  %%          via its edges
              NewVVal = 
                "from_src=" ++ USPStr ++
                ";from_dest=" ++ UDPStr, 
              DestMsgs = 
                case UpdatedDestPath of
                  CurrDestPath -> [];
                  _ -> 
                    lists:foldl(
                      fun({_, TV}, Acc) ->
                          [{TV, "dest_path=" ++ 
                                VName ++ ":" ++ 
                                UDPStr}|Acc]
                      end, [], EList)
                end,
              OutMsgs = 
                case UpdatedSrcPath of
                  CurrSrcPath -> DestMsgs;
                  _ -> 
                    lists:foldl(
                      fun({_, TV}, Acc) ->
                          [{TV, "src_path=" ++ 
                                USPStr ++ ":" ++ 
                                VName}|Acc]
                      end, DestMsgs, EList)
                end,
              {{VName, NewVVal, EList}, 
                  OutMsgs, Agg, hold}
  %% END 3:  
          end
      end
  end.

Step 3 : Testing on Phoebus ..

Checkout Phoebus:
$ git clone git://github.com/xslogic/phoebus
  $ cd phoebus

Compile (Make sure you have rebar and latest erlang-OTP installed(R14B)):
$ ./generate

Create an output folder:
$ mkdir /tmp/output

Start a phoebus terminal:
$ ./run_phoebus 1
.....
Erlang R14B (erts-5.8.1) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]

Eshell V5.8.1  (abort with ^G)
(phoebus1@my-machine)1> 

All the code I discussed can be found here. The module also contains a function to generate a random input sample

Generate input:
(phoebus1@my-machine)1> path_find:generate_input("/tmp/input", 4, 2500, 500, {2, 7}).

This will generate the input in /tmp/input. It will partition the input into 4 files.. so Phoebus will spawn 4 workers.
2500 = Number of islands
500 = Number of Boats
2 = Min number of boats an Island has access to
7 = Max number of boats an Island has access to

Start the algorithm and wait for it to end (Notice that the 'aggregate_val' we pass to the function is 'I1:I2000'. We are asking the algorithm to find the shortest path from I1 to I2000):
(phoebus1@my-machine)1> AggFun = fun path_find:aggregate_fun/2.
#Fun
(phoebus1@my-machine)2> AlgoFun = fun path_find:compute_fun/3.
#Fun
(phoebus1@my-machine)3> phoebus_master:start_link(
                          [{name, "path_finder"}, 
                           {max_steps, 100}, 
                           {algo_fun, AlgoFun}, 
                           {aggregate_val, "I1:I2000"}, 
                           {aggregate_fun, AggFun}, 
                           {input_dir, "file:///tmp/input/"}, 
                           {output_dir, "file:///tmp/output/"}]).
### Starting Job : Id["phoebus1@my-machine_1701497547"] : Name[master_path_finder] ###
.....
.....
### Job Ended : Id["phoebus1@my-machine_1701497547"] : Name[master_path_finder] : Aggegate ["done"] : Time [6333] ###

Check output directory (Outputs will vary.. since input generation is randomized.. verify with the input in /tmp/input/):
$ cat /tmp/output/* | grep 'full_path'
  I1390 full_path=I1:B222:I1390:B115:I2000 1........ 
  I1985 full_path=I1:B362:I1985:B209:I2000 1........ 
  I2019 full_path=I1:B362:I2019:B381:I2000 1........ 
  I1214 full_path=I1:B362:I1214:B209:I2000 1........ 

What is interesting to note is the second column of the output.. The algorithm has given 4 possible options.. the first one for eg. suggests we go from island I1 to I1390 via boat B222, then from I1390 to I2000 via boat B115.

Stuff to try out : Run multiple nodes of Phoebus.. and see it split the graph processing across the nodes.
Next : How to actually get Phoebus to run on a distributed setup using thrift