本文發(fā)表于本人博客。
今天接著上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應(yīng)該是講解自定義分組如何實(shí)現(xiàn),關(guān)于操作順序在這里不多說了,需要了解的可以看看我在博客園的評(píng)論,現(xiàn)在開始。
首先我們查看下Job這個(gè)類,發(fā)現(xiàn)有setGroupingComparatorClass()這個(gè)方法,具體源碼如下:
/** * Define the comparator that controls which keys are grouped together * for a single call to * {@link Reducer#reduce(Object, Iterable, * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted */ public void setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputValueGroupingComparator(cls); }
從方法的源碼可以看出這個(gè)方法是定義自定義鍵分組功能。設(shè)置這個(gè)自定義分組類必須滿足extends RawComparator,那我們可以看下這個(gè)類的源碼:
/** * <p> * A {@link Comparator} that Operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}
然而這個(gè)RawComparator是泛型繼承Comparator接口的,簡(jiǎn)單看了下那我們來自定義一個(gè)類繼承RawComparator,代碼如下:
public class MyGrouper implements RawComparator<SortAPI> { @Override public int compare(SortAPI o1, SortAPI o2) { return (int)(o1.first - o2.first); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); return compareBytes; } }
源碼中SortAPI是上節(jié)自定義排序中的定義對(duì)象,第一個(gè)方法從注釋可以看出是比較2個(gè)參數(shù)的大小,返回的是自然整數(shù);第二個(gè)方法是在反序列化時(shí)比較,所以需要是用字節(jié)比較。接下來我們繼續(xù)看看自定義MyMapper類:
public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split("/t"); try { long first = Long.parseLong(splied[0]); long second = Long.parseLong(splied[1]); context.write(new SortAPI(first,second), new LongWritable(1)); } catch (Exception e) { System.out.println(e.getMessage()); } } }
自定義MyReduce類:
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> { @Override protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.first), new LongWritable(key.second)); } }
自定義SortAPI類:
public class SortAPI implements WritableComparable<SortAPI> { public Long first; public Long second; public SortAPI(){ } public SortAPI(long first,long second){ this.first = first; this.second = second; } @Override public int compareTo(SortAPI o) { return (int) (this.first - o.first); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode(); } @Override public boolean equals(Object obj) { if(obj instanceof SortAPI){ SortAPI o = (SortAPI)obj; return this.first == o.first && this.second == o.second; } return false; } @Override public String toString() { return "輸出:" + this.first + ";" + this.second; } }
接下來準(zhǔn)備數(shù)據(jù),數(shù)據(jù)如下:
1 21 13 03 22 21 2
上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:
public class Test { static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/"; static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test.class.getSimpleName()); job.setJarByClass(Test.class); deleteOutputFile(OUTPUT_DIR); //1設(shè)置輸入目錄 FileInputFormat.setInputPaths(job, INPUT_DIR); //2設(shè)置輸入格式化類 job.setInputFormatClass(TextInputFormat.class); //3設(shè)置自定義Mapper以及鍵值類型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SortAPI.class); job.setMapOutputValueClass(LongWritable.class); //4分區(qū) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //5排序分組 job.setGroupingComparatorClass(MyGrouper.class); //6設(shè)置在一定Reduce以及鍵值類型 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //7設(shè)置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion(true); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf); if(fs.exists(new Path(path))){ fs.delete(new Path(path)); } }}
執(zhí)行代碼,然后在節(jié)點(diǎn)上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結(jié)果:
1 22 23 0
接下來我們修改下SortAPI類的compareTo()方法:
@Override public int compareTo(SortAPI o) { long mis = (this.first - o.first) * -1; if(mis != 0 ){ return (int)mis; } else{ return (int)(this.second - o.second); } }
再次執(zhí)行并查看/grouper/output/part-r-00000文件:
3 02 21 1
這樣我們就得出了同樣的數(shù)據(jù)分組結(jié)果會(huì)受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數(shù)據(jù)源進(jìn)行分組輸出。我們還可以在map函數(shù)以及reduce函數(shù)中打印記錄(過程省略)這樣經(jīng)過對(duì)比也得出分組階段:鍵值對(duì)中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當(dāng)前組再按照順序選擇第一個(gè)往緩沖區(qū)輸出(也許會(huì)存儲(chǔ)到硬盤)。其它的相同key的鍵值對(duì)就不會(huì)再往緩沖區(qū)輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數(shù)輸出的value全部迭代到同一個(gè)key中,就相當(dāng)于上面{key,value}:{1,{2,1,2}},這個(gè)結(jié)果跟最開始沒有自定義分組時(shí)是一樣的,我們可以在reduce函數(shù)輸出Iterable<LongWritable> values進(jìn)行查看,其實(shí)我覺得這樣的才算是分組吧就像數(shù)據(jù)查詢一樣。
在這里我們應(yīng)該要弄懂分組與分區(qū)的區(qū)別。分區(qū)是對(duì)輸出結(jié)果文件進(jìn)行分類拆分文件以便更好查看,比如一個(gè)輸出文件包含所有狀態(tài)的http請(qǐng)求,那么為了方便查看通過分區(qū)把請(qǐng)求狀態(tài)分成幾個(gè)結(jié)果文件。分組就是把一些相同鍵的鍵值對(duì)進(jìn)行計(jì)算減少輸出;分區(qū)之后數(shù)據(jù)全部還是照樣輸出到reduce端,而分組的話就有所減少了;當(dāng)然這2個(gè)步驟也是不同的階段執(zhí)行。
這次先到這里。堅(jiān)持記錄點(diǎn)點(diǎn)滴滴!
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注