Sorting of Big Data

If you want to sort big data set by keys, there are following ways to do that global sort (sorting on single keys)

  • by mapreduce scripts: get sample of data to find out data ranges by keys; sort data by each map; send them to different reduce according to data ranges. The code in P162@Hadoop In Practice gives a sample of this.
  • by Pig: pig’s “order by” support this perfectly
  • by Hive: you need to set reducer to 1 before using hive’s order by. Hope, it will have such improvement soon.

If you want to sort big data set by keys then by values, there are following ways to do that secondary sort

  • by mapreduce script: Make the key a composite of the natural key and the natural value. The sort comparator should order by the composite key, that is, the natural key and natural value. The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping. The code in P278@Hadoop: The Definite Guide 3ed gives a sample of this.
  • by Pig or hive’s “order by” support this perfectly
  • by Hadoop streaming see the code in P281@Hadoop: The Definite Guide 3ed

  • hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
     -D \
     -D mapred.text.key.partitioner.options=-k1,1 \ 
     -D mapred.output.key.comparator.class=\ 
     org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ 
     -D mapred.text.key.comparator.options="-k1n -k2nr"</strong> \ 
     -input input/ncdc/all \ 
     -output output_secondarysort_streaming \ 
     -mapper ch08/src/main/python/ \ 
     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ 
     -reducer ch08/src/main/python/ \ 
     -file ch08/src/main/python/ \ 
     -file ch08/src/main/python/ 

The specification ‘mapred.text.key.partitioner.options’ configures the partitioner. The value -k1,1 instructs the partitioner to use only the first field of the key, where fields are assumed to be separated by a string defined by the ‘map.output.key.field.separator’ property (a tab character by default).

Hadoop provides KeyFieldBasedComparator, which is ideal for this purpose. The comparison order is defined by a specification that is like the one used for GNU sort. It is set using the ‘mapred.text.key.comparator.options’ property. The value “-k1n -k2nr” used in this example means “sort by the first field in numerical order, then by the second field in reverse numerical order.” Like its partitioner cousin, ‘KeyFieldBasedPartitioner’, it uses the separator defined by the ‘map.output.key.field.separator’ to split a key into fields.

Another example from Apache

hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf \
-jobconf \
-jobconf map.output.key.field.separator=. \
-jobconf num.key.fields.for.partition=2 \
-jobconf mapred.reduce.tasks=12</pre>

Here, ‘-jobconf’ and ‘-jobconf’ are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper.

The map output keys of the above Map/Reduce job normally have four fields separated by ‘“.”‘. However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the ‘-jobconf num.key.fields.for.partition=2’ option. Here, ‘-jobconf map.output.key.field.separator=.’ specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.

This is effectively equivalent to specifying the first two fields as the primary key and the next two fields as the secondary. The primary key is used for partitioning, and the combination of the primary and secondary keys is used for sorting.