全排序其实就是全局排序,就是使得所有数据按序排列输出,和我们平常做的给一个数组排序没有什么区别,唯一的区别就是数据量的不同,这里涉及的数据量是TB级别的,这就意味着不可能简单地把数据加载进内存进行排序,需要用到分布式计算,所以就产生了Hadoop的全排序,Hadoop的全排序在实际应用有着重要的作用。
1)准备数据(数据已经进行过分组聚合操作):
2)封装文件各字段:
package com.Sort.Whole;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private Float upFlow; //NA_Sales字段
private Float downFlow; //Other_Sales字段
private Float sumFlow; //sum_Sales字段
public FlowBean() {
super();
}
public FlowBean(Float upFlow, Float downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
**//比较,以sumFlow进行从大到小排序**
public int compareTo(FlowBean flowBean) {
int result;
if (sumFlow > flowBean.getSumFlow()){
result = -1;
}else if(sumFlow < flowBean.getSumFlow()){
result = 1;
}else{
result = 0;
}
return result;
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeFloat(upFlow);
dataOutput.writeFloat(downFlow);
dataOutput.writeFloat(sumFlow);
}
//反序列
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readFloat();
downFlow = dataInput.readFloat();
sumFlow = dataInput.readFloat();
}
public Float getUpFlow() {
return upFlow;
}
public void setUpFlow(Float upFlow) {
this.upFlow = upFlow;
}
public Float getDownFlow() {
return downFlow;
}
public void setDownFlow(Float downFlow) {
this.downFlow = downFlow;
}
public Float getSumFlow() {
return sumFlow;
}
public void setSumFlow(Float sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + sumFlow
;
}
}
3) Map程序,我们mapreduce程序以key进行排序(如果不指定排序规则,则以字典作为排序规则),那我们可以将之前的value作为key,key作为value来进行排序,那么该键值对就是Map程序的输出,Reduce程序的输入;
package com.Sort.Whole;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMappper extends Mapper<LongWritable, Text,FlowBean,Text> {
**//LongWritable是为偏移量,Text是每一行的内容,FlowBean封装upFlow,downFlow,sumFlow三个字段(第二字段开始到第四个字段),Text是每一行的第一个字段**
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Action 877.83 1751.18 2629.01
//1. 获取一行
String line = value.toString();
//2. 切割
String[] fields = line.split(",");
//3.封装对象
String Genre = fields[0];
Float upFlow = Float.parseFloat(fields[fields.length-3]);
Float downFlow = Float.parseFloat(fields[fields.length-2]);
Float sumFlow = Float.parseFloat(fields[fields.length-1]);
k.setDownFlow(downFlow);
k.setUpFlow(upFlow);
k.setSumFlow(sumFlow);
v.set(Genre);
//4.写出
context.write(k,v);
}
}
- Reduce程序,它的输入既是Map程序的输出,它的输出key应是源文件的第一个字段,value是源文件中的其他字段
package com.Sort.Whole;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//Action 877.83 1751.18 2629.01
for (Text value : values) {
context.write(value, key);
}
}
}
**5)主程序: **
```java
package com.Sort.Whole;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//这里请注意:Workerhdfs目录下不能有output4文件
args = new String[]{"E:\\谷歌文件\\谷歌数据集\\","F:\\scala\\Workerhdfs\\output4"};
//1.获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar路径
job.setJarByClass(FlowCountSortDriver.class);
//3.关联mapper和reducer
job.setMapperClass(FlowCountSortMappper.class);
job.setReducerClass(FlowCountSortReducer.class);
//4 设置mapper输出的key和value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5. 设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6.设置输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
运行结果如下:
part-r-00000文件内容,第四字段按照从大到小排列,简单的排列成功