Googleの分散処理技術であるMapReduceを、そのオープン実装のHadoopを使って試したいと思っても、なんか設定がめんどくさそうで二の足を踏んじゃう人は多いはず。
そこで、並列計算をせずにサーバーなしで動かせるサンプルを作ってみた。
ただ、設定は不要なんだけど、Windowsの場合はCygwinがやっぱり必要で、PATHにCYGWIN_HOME\binを追加しておく必要がある。残念。
今回は、Javaソース中のimportされたクラスを数えるっていう処理をMapReduceでやってみる。
Hadoopは、こっからダウンロード。0.17.2.1を使った。
http://hadoop.apache.org/core/releases.html
コンパイル・実行には、解凍してできる次のJARをクラスパスに追加しておく必要がある。
HADOOP_HOME/hadoop-0.17.2.1-core.jar
HADOOP_HOME/lib/commons-cli-2.0-SNAPSHOT.jar
HADOOP_HOME/lib/commons-logging-1.0.4.jar
HADOOP_HOME/lib/log4j-1.2.13.jar
HADOOP_HOME/lib/commons-httpclient-3.0.1.jar
ということで、ソース。
setInputPathsやsetOutputPathは適当なディレクトリを指定する。
setInputPathsのディレクトリ中に別のディレクトリがあったら怒られるので、ファイルだけのディレクトリを指定する。
setOutputPathは存在するディレクトリを指定すると起こられる。二回目に実行するときは削除しておく。
内容は、コメントと空気を読んでください。
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MyMapReduce extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>, Reducer<Text, IntWritable, Text, IntWritable> { /** 実行メソッド */ public static void main(String[] args) throws Exception { JobConf runconf = new JobConf(); //入力ディレクトリ FileInputFormat.setInputPaths(runconf, new Path("C:/hadooptest/src")); //出力ディレクトリ FileOutputFormat.setOutputPath(runconf, new Path("C:/hadooptest/temp")); //Mapの設定 runconf.setMapperClass(MyMapReduce.class); runconf.setMapOutputKeyClass(Text.class); //Mapperの3番目の型 runconf.setMapOutputValueClass(IntWritable.class);//Mapperの4番目の型 //Reduceの設定 runconf.setReducerClass(MyMapReduce.class); runconf.setOutputKeyClass(Text.class); //Reducerの3番目の型 runconf.setOutputValueClass(IntWritable.class);//Reducerの4番目の型 //処理開始 JobClient.runJob(runconf); } /** Map処理 引数はMapperインタフェースで指定した型 */ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().trim(); if(line.startsWith("import")){ //import文のとき、クラス名をキーに値を1にして出力 Text word = new Text(line.substring(7, line.length() - 1)); output.collect(word, new IntWritable(1)); } } /** Reduce処理 引数はMapperインタフェースで指定した型 */ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { //キーに割り当てられた値を数える int sum = 0; while(values.hasNext()){ sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
実行すると、だらだらとMapReduce処理が動く。
で、出力先として設定したディレクトリにpart-00000というファイルが作成されてて、こんな内容になってる。
java.io.IOException 3 java.util.Iterator 2 org.apache.hadoop.fs.Path 1 org.apache.hadoop.io.IntWritable 3 org.apache.hadoop.io.LongWritable 2 org.apache.hadoop.io.Text 3
ということで、MapReduceができました。やった!