Wednesday, February 17, 2010

Spatial Index RTree

For location-based search, it is very common to search for objects based on their spatial location. e.g. find all restaurants within 5 miles of my current location, or find all schools within the zipcode of 95110 ... etc.

Every spatial object can be represented by an object id, a minimal bounded rectangle (MBR), as well as other attributes. So the space can be represented by a collection of spatial objects. (here we use 2 dimension to illustrate the idea, but the concept can be extended to N dimensions.)

A query can be represented as another rectangle. The query is about locating the spatial objects whose MBR overlaps with the query rectangle.


RTree is a spatial indexing technique such that given a query rectangle, we can quickly locate the spatial object results.

The concept is similar to BTree. We group spatial objects that are close by each other and form a tree whose intermediate nodes contains "close-by" objects. Since the MBR of the parent node contains all MBR of its children, the Objects are close by if their parent's MBR is minimized.


Search

Start from the root, we examine each children's MRB to see if it overlaps with the query MBR. We skip the whole subtree if there is no overlapping, otherwise, we recurse the search by drilling into each child.

Notice that unlike other tree algorithm where only traverse down one path. Our search here needs to traverse down multiple path if the overlaps happen. Therefore, we need to structure the tree to minimize the overlapping as high in the tree as possible. This means we want to minimize the sum of MBR areas along each path (from the root to the leaf) as much as possible.

Insert

To insert a new spatial object, starting from the root node, pick the children node whose MBR will be extended least if the new spatial object is added, walk down this path until reaching the leaf node.

If the leaf node has space left, insert the object to the leaf node and update the MBR of the leaf node as well as all its parents. Otherwise, split the leaf node into two (create a new leaf node and copy some of the content of the original leaf node to this new one). And then add the newly created leaf node to the parent of the original leaf node. If the parent has no space left, the parent will be split as well.

If the split goes all the way to the root, the original root will then be split and a new root is created.

Delete

Deleting a spatial node will first search for the containing leaf node. Remove the spatial node from the leaf node's content and update its MBR and its parent's MBR all the way to the root. If the leaf node now has less than m node, then we need to condense the node by marking the leaf node to be delete. And then we remove the leaf node from its parent's content as well as updating the . If the parent is now less than m node, we mark the parent to be delete also and remote the parent from the parent's parent. At this point, all the node that is marked delete is removed from the RTree.

Notice that the content with these delete node is not all garbage, since they still have some children that are valid nodes (but were removed from the tree). Now we need to reinsert all these valid nodes back in the tree.

Finally, we check if the root node contains only one child, we throw away the original root and use its own child to become the new root.

Update

Update happens when an existing spatial node changes its dimension. One way is to just change the spatial node's MBR but not change the RTree. A better way (but more expensive) is to delete the node, modify it MBR and then insert it back to the RTree.

Thursday, February 11, 2010

Cloud MapReduce Tricks

Cloud Map/Reduce developed by Huan Liu and Dan Orban offers some good lessons in how to design applications on top of Cloud environment, which has a set of characteristics and constraints.

Although it is providing the same map, reduce programming model to the application developer, the underlying implementation architecture of Cloud MR is drastically different from Hadoop. For a description of Hadoop internals, here is it.

Build on top of a Cloud OS (which is Amazon AWS), Cloud MR enjoys the inherit scalability and resiliency, which greatly simplifies its architecture.
  1. Cloud MR doesn't need to design a central coordinator components (like the NameNode and JobTracker in the Hadoop environment). They simply store the job progress status information in the distributed metadata store (SimpleDB).
  2. Cloud MR doesn't need to worry about scalability in the communication path and how data can be moved efficiently between nodes, all is taken care by the underlying CloudOS
  3. Cloud MR doesn't need to worry about disk I/O issue because all storage is effectively remote and being taken care by the Cloud OS.
On the other hand, Cloud OS has a set of constraints and limitations that the design of Cloud MR has to deal with
  • Network latency and throughput : 20 - 100 ms for SQS access, SimpleDB domain write througput is 30 - 40 items/sec
  • Eventual consistency : 2 simultaneous requests to dequeue from SQS can both get the same message. SQS sometimes report empty when there are still messages in the queue.
Cloud MR use a "parallel path" technique to overcome the network throughput issue. The basic idea is to read/write data across multiple network paths so the effective throughput is the aggregated result of individual path.

Cloud MR use a "double check" technique to overcome the consistency issue. Writer will write status into multiple places and reader will read from multiple places also. If the reader read inconsistent result from different place, that means the eventual consistent state hasn't arrived yet so it needs to retry later. When the state read from different places agrees with each other, eventual consistency has arrived and the state is valid.

Following describe the technical details of Cloud MR ...

Cloud MR Architecture


SimpleDB is used to store Job status. Client submit jobs to SimpleDB, Map and Reduce workers update and extract job status from the SimpleDB. The actual data of each job is stored in SQS (which can also points to an Object stored in S3).

So the job progress in the following way

Job Client Processing Cycle
  1. Store data in many S3 file objects
  2. Create a Mapper task request for each file split (each map task request contains a reference to the S3 object and the byte range).
  3. Create an input queue in SQS and enqueue each Mapper task request to it.
  4. Create a master reduce queue, an result output queue as well as multiple partition queues.
  5. Create one reducer task request for each partition queue. Each reducer task request contains a pointer to the partition queue.
  6. Enqueue the reducer task requests to the master reducer queue
  7. Create a job request that contains a mapper task count S as well as a reference to all the SQS queue created so far.
  8. Add the job request into SimpleDB
  9. Invoke AWS commands to start the EC2 instances for Mappers and Reducers, passing along queue and SimpleDB locations as "user data" to the EC2 instances.
  10. From this point onwards, poll the SimpleDB on the job progress status
  11. When the job is complete, download the result from output queue and S3
Mapper Processing Cycle
  1. Take a mapper task request from the SQS input queue
  2. Fetch the file split and parse out each record
  3. Invoke user defined map() function, for each emit intermediate key, perform a (hash(k1) % no_of_partitions). Enqueue the intermediate record to the corresponding partition queue.
  4. When done with the mapper task request, write a commit record containing worker id, map request id and number of records processed count per partition (in other words, R[i][j] where i is the map request and j is the partition no.).
  5. Remove the map task request from the SQS input queue
Due to the eventual consistency model constraint, Mapper cannot stop processing even when it sees the input queue is empty. Instead it count all the commit records to make sure the unique map request id has been sum up equal to the mapper task count S in the Job request. When this happens, it enters the reduce phase.

It is possible that a Mapper worker crashes before it finishes the mapper task, so another mapper will re-process the map task request (after the SQS timeout). Or due to eventual consistency model, it is possible to have 2 simultaneous mappers working on the same file splits. In both case, it is possible of causing some duplications in the partition queues.

To facilitates the duplicate elimination, each intermediate records emit by the mapper will be tagged with [map request id, worker id, a unique number]

Reducer Processing Cycle
  1. Monitor SimpleDB and wait for seeing commit records from all mappers.
  2. Dequeue a reducer task request from the master reducer queue
  3. Go to the corresponding partition queue, dequeue each intermediate record
  4. Invoke user define reduce() function and write the reducer output to the output queue
  5. When done with the reducer task request, write a commit record in a similar way as the Mapper worker
  6. Remove the reduce task request from the master reducer queue
To eliminate duplicated intermediate messages in the partition queue, each Reducer will first query the SimpleDB for all the commit records written by the successful mapper worker. If there are different workers working on the same mapper request, there maybe be multiple commit records with the same mapper request id. In this case, the Reduce will arbitrary pick the winner and then all intermediary records tagged with the mapper request id but not the winner worker id will be discarded.

Similar to the Mapper, Reducer j will not stop getting the message from the partition queue even when it is empty, it will keep reading the message up to the sum all R[i][j] over i.

Due to eventual consistency, it is possible that multiple reducers dequeue the same reducer task request from the master reducer queue and then taking messages from the same partition queue. Since they are competing on the same partition queue, one of them will find the queue is empty before they reach the sum of R[i][j] over i. After certain timeout period, the reducer will write a "suspect conflict" record (containing its worker id) in the SimpleDB. If it found another reducer has written such record, it knows there is another reducer working in the same partition. Workers with the lowest id is the loser and so the reducer will keep reading until it sees another conflict record with a lower id, then it will drop off existing processing and pickup another reducer task. All the records read by the loser will come back to the queue after the timeout period, and will be picked up by the winner.

Network Latency and Throughput

One of the SimpleDB-specific implementation constraint is they read and write throughput is very asymmetric. While the read response is very fast, the write is slow. To mitigate this asymmetry, Cloud MR using multiple domains in SimpleDB. When it writes to SimpleDB, it randomly pick one domain and write to it. This way, the write request workload is spread across multiple domains. When it reads from SimpleDB, it read every domain and aggregate the results (since one domain will have the result).

To overcome the latency issue of SQS, CloudMR at the Mapper side uses buffering technique to batch up intermediate messages (destined to the same partition), A message buffer is 8k size (the maximum size of a SQS message). When the buffer is full (or after some timeout period), a designated thread will flush the buffer by writing a composite message (which contains all the intermediate records) into the SQS.


The reducer side works in a similar way, multiple read threads will dequeue message from the partition queue and put them in a Read buffer, where the Reducer will read the intermediate messages. Notice that it is possible to have 2 threads reading the same message from the partition queue (remember the eventual consistency scenario described above). To eliminate the potential duplicated message, the reducer will examine the unique number tagged with the message and discard the message that has seen before.

Difference with Hadoop

Map/Reduce developers familiar with Hadoop implementation will find Cloud MR behaves in a similar way. But there are a number of difference that I want to highlight here.
  • Reducer key is not sorted: Unlike Hadoop which guarantee that keys arrived at the same partition (or reducer) will be in sorted order, Cloud MR doesn't provide such feature. Application need to do their own sorting if they need the sort order.

For a detail technical description of Cloud MR, as well as how it is compared with Hadoop, read the original paper from Huan Liu and Dan Orban

Thursday, February 4, 2010

NoSQL GraphDB

I received some constructive criticism regarding my previous blog in NoSQL patterns that I covered only the key/value store but have left out Graph DB.

The Property Graph Model

A property graph is a collection of Nodes and Directed Arcs. Each node represents an entity and has an unique id as well as a Node Type. The Node Type defines a set of metadata that the node has. Each arc represents a unidirectional relationship between two entities and has an Arc Type. The Arc Type defines a set of metadata that the arc has.


General Graph Processing

I found many of the graph algorithms follows a general processing pattern. There are multiple rounds of (sequential) processing iterations. Within each iteration, there are a set of active nodes that perform local processing in parallel. The local processing can modify the node's properties, adding or removing links to other nodes, as well as sending message across links. All message passing are done after the local processing.


This model is similar to the Google Pregel model.

Notice that this model maps well into parallel computing environment where the processing of the set of active node can be spread across multiple processors (or multiple machines in a cluster)

Notice that all messages from all in-coming links are arrived before the link changes within local processing. On the other hand, all message send to all out-going links after the links have changed after the local processing. The term "local processing" means it cannot modify the properties of other nodes or other links.

Because two nodes can simultaneously modify the link in-between them, the following conflicts can happen
  • A node delete a link while other node modifies the link properties.
  • Both nodes on each side modify the properties of the link in-between
A conflict resolution mechanism needs to be attached to each Arc Type to determine how the conflict should be resolved, either determining who is the winner or how to merge the effects.

Neo4j provide a restricted, single-threaded graph traversal model
  • At each round, the set of active nodes is always a single node
  • The set of active nodes of next round is determined by the traversal policy (breath or depth-first), but is still a single node
  • It offer a callback function to determine whether this node should be included in the result set
Gremlin, on the other hand, provides an interactive graph traversal model where user can step through each iteration. It uses an XPath like syntax to express the navigation.
  • The node is expressed as Node(id, inE, outE, properties)
  • The arc is expressed as Arc(id, type, inV, outV, properties)
As an example, if the graph represents a relationship between 2 types of entities. Person writes Book, then given a person, her co-author can be expressed in the following XPath expression
./outE[@type='write']/inV/inE[@type='write']/outV

Path Algebra

Marko Rodriguez has described of a set of matrix operations when the Graph is represented as adjacency matrix. Graph algorithms can be describe as an algebraic form.

Traverse operation can be expressed as Matrix multiplication. If A is the adjacency matrix of the graph. Then A.A represents connection with path of length = 2.

Similar, Merge operation can be expressed as Matrix addition. For example, (A + A.A + A.A.A) represent connectivity within 3 degree of reach.

In a special case when the graph represents a relationship between 2 types of entities. e.g. if A represents a authoring relationship (person write a book). Then A.(A.transpose()) represents co-author relationship (person co-author with another person).

Marko also introduce a set of Filter operations, (not filter / clip filter /column filter /row filter /vertex filter)

Map Reduce
Depends on the traversal strategies inherit from the graph algorithms, certain algorithms which has higher sequential dependency doesn't fit well into parallel computing. For example, graph algorithms with a breath-first search nature fits better into parallel computing paradigm with those that has a depth-first search nature. On the other hand, perform search at all nodes fits better in parallel computing than perform search at a particular start node.

There are different storage representation of graph, from incident list, incident matrix, adjacency list and adjacency matrix. For sparse graph (which is the majority cases), lets assume adjacency list is used for the storage model for subsequent discussion.

In other words, the graph is represented as a list of records, each record is [node, connected_nodes].

There are many graph algorithms and it is not my intend is have an exhausted list. Below are the one that I have used in my previous projects that can be translated into Map/Reduce form.

Topological Sort is commonly used to sort out a work schedule based on dependency tree. It can be done as follows ...

# Topological Sort

# Input records
[node_id, prerequisite_ids]

# Output records[node_id, prerequisite_ids, dependent_ids]
class BuildDependentsJob {
 map(node, prerequisite_ids) {
   for each prerequisite_id in prerequisite_ids {
     emit(prerequisite_id, node)
   }
 }

 reduce(node, dependent_ids) {
   emit(node, [node, prerequisite_ids, dependent_ids])
 }
}

class BuildReadyToRunJob {
 map(node, node) {
   if ! done?(node) and node.prerequisite_ids.empty? {
     result_set.append(node)
     done(node)
     for each dependent_id in dependent_ids {
       emit(dependent_id, node)
     }
   }
 }

 reduce(node, done_prerequsite_ids) {
   remove_prerequisites(node, done_prerequsite_ids)
 }
}

# Topological Sort main program
main() {
 JobClient.submit(BuildDependentsJob.new)
 Result result = []

 result_size_before_job = 0
 result_size_after_job = 1

 while (result_size_before_job < result_size_after_job) {
   result_size_before_job = result.size
   JobClient.submit(BuildReadyToRunJob.new)
   result_size_after_job = result.size
 }

 return result
}

Minimum spanning tree is a pretty common algorithm, the Prim's algorithm looks like the following.
# Minimum Spanning Tree (MST) using Prim's algorithm

Adjacency Matrix, W[i][j] represents weights
W[i][j] = infinity if node i, j is disconnected

MST has nodes in array N = [] and arcs A = []
E[i] = minimum weighted edge connecting to the skeleton
D[i] = weight of E[i]

Initially, pick a random node r into N[]
N = [r] and A = []
D[r] = 0; D[i] = W[i][r];

Repeat until N[] contains all nodes
 Pick node k outside N[] where D[k] is minimum
 Add node k to N; Add E[k] to A
 for all node p connected to node k
   if W[p][k] < D[p]
     D[p] = W[p][k]
     E[p] = k
   end
 end
end

We are doing the map/reduce here because it is very similar to another popular algorithm single source shortest path. The Map/Reduce form of the SPSS based on Dijkstra's algorithm is as follows ...
# Single Source Shortest Path (SSSP) based on Dijkstra

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

SSSP has nodes in array N = []
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially, put the source node s into N[]
N = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

Repeat until N[] contains all nodes
 Pick node k outside N[] where L[k] is minimum
 Add node k to N;
 for all node p connected from node k {
   if L[k] + W[k][p] < L[p] {
     L[p] = L[k] + W[k][p]
     Path[p] = Path[k].append(Arc[k][p])
   }
 }
end repeat


# Here is the map/reduce pseudo code would look like

class FindMinimumJob
 map(node_id, path_length) {
   if not N.contains(node_id) {
     emit(1, [path_length, node_id])
   }
 }

 reduce(k, v) {
   min_node, min_length = minimum(v)
   for each node in min_node.connected_nodes {
     emit(node, min_node)
   }
 }
}

class UpdateMinPathJob {
 map(node, min_node) {
   if L[min_node] + W[min_node][node] < L[node] {
     update L[node] = L[min_node] + W[min_node][node]
      Path[node] =
       Path[min_node].append(arc(min_node, node))
    }
 }
}
# Single Source Shortest Path main program
main() {
 init()
 while (not N.contains(V)) {
   JobClient.submit(FindMinimumJob.new)    JobClient.submit(UpdateMinPathJob.new)  }

 return Path
}

The same SSSP problem can also be solved using breath-first search. The intuition is to grow a frontier from the source at each iteration and update the shortest distance from the source.

# Single Source Shortest Path (SSSP) using BFS

Adjacency Matrix, W[i][j] represents weights of arc
  connecting node i to node j
W[i][j] = infinity if node i, j is disconnected

Frontier nodes in array F
L[i] = Length of minimum path so far from the source node
Path[i] = Identified shortest path from source to i

Initially,
F = [s]
L[s] = 0; L[i] = W[s][i];
Path[i] = arc[s][i] for all nodes directly connected
from source.

# input is all nodes in the frontier F
# output is frontier of next round FF

class GrowFrontierJob {
 map(node) {
   for each to_node in node.connected_nodes {
     emit(to_node, [node, L[node] + W[node][to_node]])
   }
 }

 reduce(node, from_list) {
   for each from in from_list {
     from_node = from[0]
     length_via_from_node = from[1]
     if (length_via_from_node < L[node] {
       L[node] = length_via_from_node
       Path[node] =
         Path[from_node].append(arc(from_node, node))
       FF.add(node)
     }
   }
 }
}

# Single Source Shortest Path BFS main program
main() {
 init()
 while (F is non-empty) {
   JobClient.set_input(F)
   JobClient.submit(FindMinimumJob.new)
   copy FF to F
   clear FF
 }

 return Path
}