1数据
{"movie":"994","rate":"3","timeStamp":"978242080","uid":"5"}
{"movie":"994","rate":"4","timeStamp":"978244540","uid":"3"}
{"movie":"994","rate":"1","timeStamp":"978246576","uid":"2"}
{"movie":"994","rate":"4","timeStamp":"978245568","uid":"2"}
{"movie":"272","rate":"3","timeStamp":"978245487","uid":"1"}
{"movie":"272","rate":"1","timeStamp":"97824}
{"movie":"348","rate":"3","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"4","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"5","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"4","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"1","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"2","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"2","timeStamp":"978245863","uid":"3"}
2 需求
- 统计每部电影的总分
- 统计每部电影的均分
package com._51doit.pojo;
public class MovieBean {
private String movie;
private double rate ;
private long timeStamp ;
private int uid ;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
@Override
public String toString() {
return "MovieBean{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid=" + uid +
'}';
}
}
3 统计每部电影的总分
package com._51doit.mr.movie;
import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieBean;
import com.alibaba.fastjson.JSON;
import javafx.scene.shape.HLineTo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.io.IOException;
public class MovieSumRate {
static class MovieSumRateMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
Text k = new Text();
DoubleWritable v = new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = null ;
try {
// 每行数据
line = value.toString();
// 将每行json格式的数据转换成JavaBean
MovieBean mb = JSON.parseObject(line, MovieBean.class);
String movie = mb.getMovie();
double rate = mb.getRate();
k.set(movie);
v.set(rate);
context.write(k, v);
} catch (Exception e) {
System.out.println(line);
}
}
}
static class MovieSumRateReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
DoubleWritable v = new DoubleWritable() ;
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0d ;
for (DoubleWritable value : values) {
double rate = value.get();
sum+=rate ;
}
v.set(sum);
context.write(key,v);
}
}
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
Configuration conf = new Configuration();
// 参数2 job的名字
Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());
job.setMapperClass(MovieSumRateMapper.class);
job.setReducerClass(MovieSumRateReducer.class);
// 设置map阶段的输出类型
//job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
// 最终结果的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// job.setNumReduceTasks(2); //启动3个reduce任务
// 待处理数据的路径
FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\res2"));
job.waitForCompletion(true);
}
}
4 统计每部电影的均分
package com._51doit.mr.movie;
import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieBean;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.io.IOException;
public class MovieAvgRate {
static class MovieAvgRateMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
Text k = new Text();
DoubleWritable v = new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
MovieBean mb = JSON.parseObject(line, MovieBean.class);
double rate = mb.getRate();
String movie = mb.getMovie();
k.set(movie);
v.set(rate);
context.write(k,v);
} catch (Exception e) {
}
}
}
static class MovieAvgRateReducer extends Reducer<Text , DoubleWritable , Text ,DoubleWritable>{
DoubleWritable v = new DoubleWritable() ;
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
try {
double sum = 0d ; //总分
int count = 0 ; // 次数
for (DoubleWritable value : values) {
double rate = value.get();
sum+=rate ;
count++ ;
}
double avgRate = sum/count ;
v.set(avgRate);
context.write(key,v);
} catch (Exception e) {
}
}
}
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
Configuration conf = new Configuration();
// 参数2 job的名字
Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());
job.setMapperClass(MovieAvgRateMapper.class);
job.setReducerClass(MovieAvgRateReducer.class);
// 设置map阶段的输出类型
//job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
// 最终结果的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// job.setNumReduceTasks(2); //启动3个reduce任务
// 待处理数据的路径
FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\res12"));
job.waitForCompletion(true);
}
}
5 统计每个人的总评论分数 uid MovieBean
如果将自定义的类放在MR程序可以或者value的位置一定要序列化
value: 序列化
key位置: 序列化, 可排序
5,1 MovieWritable
public class MovieWritable implements Writable {
private String movie;
private double rate;
private long timeStamp;
private int uid;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
@Override
public String toString() {
return "Movie{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid=" + uid +
'}';
}
// 注意写出的顺序和读取的顺序一致
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeInt(uid);
}
public void readFields(DataInput dataInput) throws IOException {
this.movie = dataInput.readUTF();
this.rate = dataInput.readDouble();
this.timeStamp = dataInput.readLong();
this.uid = dataInput.readInt();
}
}
package com._51doit.mr.movie;
import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieWritable;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.io.IOException;
public class MovieSumRateUid {
static class MovieSumRateUidMapper extends Mapper<LongWritable, Text, IntWritable, MovieWritable> {
IntWritable k = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
MovieWritable mw = JSON.parseObject(line, MovieWritable.class);
k.set(mw.getUid());
context.write(k, mw);
} catch (Exception e) {
// e.printStackTrace();
}
}
}
static class MovieSumRateUidReducer extends Reducer<IntWritable, MovieWritable, IntWritable, DoubleWritable> {
DoubleWritable v = new DoubleWritable();
@Override
protected void reduce(IntWritable key, Iterable<MovieWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0d;
for (MovieWritable mw : values) {
sum += mw.getRate();
}
v.set(sum);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
Configuration conf = new Configuration();
// 参数2 job的名字
Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());
job.setMapperClass(MovieSumRateUidMapper.class);
job.setReducerClass(MovieSumRateUidReducer.class);
// 设置map阶段的输出类型
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(MovieWritable.class);
// 最终结果的数据类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleWritable.class);
// job.setNumReduceTasks(2); //启动3个reduce任务
// 待处理数据的路径
FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\uid_sum_rate"));
job.waitForCompletion(true);
}
}