1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package topn;
import java.io.IOException; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Top5 { public static final int K = 5;
static class Top5Mapper extends Mapper< LongWritable, Text, IntWritable,Text> { TreeMap<Integer,String> treemap = new TreeMap<Integer, String>(); @Override public void map(LongWritable key,Text values,Context context) { String line = values.toString(); if(line.trim().length()>0&&line.indexOf("\t")!=-1) { String[] s = line.split("\t",2); String keyWord = s[0]; Integer cnt = Integer.parseInt(s[1]); treemap.put(cnt,keyWord); if(treemap.size()>K) { treemap.remove(treemap.firstKey()); } } } @Override protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num:treemap.keySet()) { context.write(new IntWritable(num), new Text(treemap.get(num))); } } } static class Top5Reducer extends Reducer<IntWritable, Text, IntWritable,Text> { TreeMap<Integer, String> treemap = new TreeMap<Integer, String>(); @Override public void reduce(IntWritable key,Iterable<Text> values,Context context) { treemap.put(key.get(),values.iterator().next().toString()); if(treemap.size()>K) { treemap.remove(treemap.firstKey()); } } @Override protected void cleanup(Reducer<IntWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num:treemap.keySet()) { context.write(new IntWritable(num),new Text(treemap.get(num))); } } }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name", "local"); Job job = Job.getInstance(); job.setJarByClass(Top5.class); job.setMapperClass(Top5Mapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(Top5Reducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("d:/mapred/output_topwords/part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("d:/mapred/output_top5")); job.waitForCompletion(true); } }
|