hadoop详细笔记(十二) mapreduce数据分析案例之电影评分案例

   日期:2020-07-12     浏览:89    评论:0    
核心提示:1数据{movie:994,rate:3,timeStamp:978242080,uid:5}{movie:994,rate:4,timeStamp:978244540,uid:3}{movie:994,rate:1,timeStamp:978246576,uid:2}{movie:994,rate:4,timeStamp:978245568,uid:2}{movi

1数据

{"movie":"994","rate":"3","timeStamp":"978242080","uid":"5"}
{"movie":"994","rate":"4","timeStamp":"978244540","uid":"3"}
{"movie":"994","rate":"1","timeStamp":"978246576","uid":"2"}
{"movie":"994","rate":"4","timeStamp":"978245568","uid":"2"}
{"movie":"272","rate":"3","timeStamp":"978245487","uid":"1"}
{"movie":"272","rate":"1","timeStamp":"97824}
{"movie":"348","rate":"3","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"4","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"5","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"4","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"1","timeStamp":"978241434","uid":"5"}
{"movie":"348","rate":"2","timeStamp":"978245863","uid":"3"}
{"movie":"348","rate":"2","timeStamp":"978245863","uid":"3"}

2 需求

  1.  统计每部电影的总分
  2.  统计每部电影的均分
package com._51doit.pojo;


public class MovieBean {
    
    private String movie;
    private  double rate ;
    private long timeStamp ;
    private int uid ;

    public String getMovie() {
        return movie;
    }

    public void setMovie(String movie) {
        this.movie = movie;
    }

    public double getRate() {
        return rate;
    }

    public void setRate(double rate) {
        this.rate = rate;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public int getUid() {
        return uid;
    }

    public void setUid(int uid) {
        this.uid = uid;
    }

    @Override
    public String toString() {
        return "MovieBean{" +
                "movie='" + movie + '\'' +
                ", rate=" + rate +
                ", timeStamp=" + timeStamp +
                ", uid=" + uid +
                '}';
    }
}

3 统计每部电影的总分

package com._51doit.mr.movie;

import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieBean;
import com.alibaba.fastjson.JSON;
import javafx.scene.shape.HLineTo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.IOException;


public class MovieSumRate {
    static class MovieSumRateMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        Text k = new Text();
        DoubleWritable v = new DoubleWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = null ;
            try {
                // 每行数据
                line = value.toString();
                // 将每行json格式的数据转换成JavaBean
                MovieBean mb = JSON.parseObject(line, MovieBean.class);
                String movie = mb.getMovie();
                double rate = mb.getRate();
                k.set(movie);
                v.set(rate);
                context.write(k, v);
            } catch (Exception e) {
                System.out.println(line);
            }
        }
    }

    static class MovieSumRateReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        DoubleWritable v = new DoubleWritable() ;
        
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0d ;
            for (DoubleWritable value : values) {
                double rate = value.get();
                sum+=rate ;
            }
            v.set(sum);
            context.write(key,v);
        }
    }

    public static void main(String[] args) throws Exception {

        Logger.getLogger("org").setLevel(Level.ERROR);
        Configuration conf = new Configuration();
        // 参数2  job的名字
        Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());

        job.setMapperClass(MovieSumRateMapper.class);
        job.setReducerClass(MovieSumRateReducer.class);
        // 设置map阶段的输出类型
        //job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);
        // 最终结果的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
       // job.setNumReduceTasks(2);  //启动3个reduce任务
        // 待处理数据的路径
        FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\res2"));

        job.waitForCompletion(true);
    }

}

4 统计每部电影的均分

package com._51doit.mr.movie;

import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieBean;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.IOException;


public class MovieAvgRate {

    static class MovieAvgRateMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        Text k = new Text();
        DoubleWritable v = new DoubleWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                MovieBean mb = JSON.parseObject(line, MovieBean.class);
                double rate = mb.getRate();
                String movie = mb.getMovie();
                k.set(movie);
                v.set(rate);
                context.write(k,v);
            } catch (Exception e) {
            }
        }
    }

    static class MovieAvgRateReducer extends Reducer<Text , DoubleWritable , Text ,DoubleWritable>{
        DoubleWritable v =  new  DoubleWritable() ;
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            try {
                double sum = 0d ;  //总分
                int count = 0 ; // 次数
                for (DoubleWritable value : values) {
                    double rate = value.get();
                    sum+=rate ;
                    count++ ;
                }
                double avgRate = sum/count ;
                v.set(avgRate);
                context.write(key,v);
            } catch (Exception e) {
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        Configuration conf = new Configuration();
        // 参数2  job的名字
        Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());

        job.setMapperClass(MovieAvgRateMapper.class);
        job.setReducerClass(MovieAvgRateReducer.class);
        // 设置map阶段的输出类型
        //job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);
        // 最终结果的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // job.setNumReduceTasks(2);  //启动3个reduce任务
        // 待处理数据的路径
        FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\res12"));

        job.waitForCompletion(true);

    }
}

5 统计每个人的总评论分数  uid  MovieBean

如果将自定义的类放在MR程序可以或者value的位置一定要序列化

value:      序列化

key位置:  序列化, 可排序

5,1 MovieWritable


public class MovieWritable implements Writable {
    
    private String movie;
    private double rate;
    private long timeStamp;
    private int uid;

    public String getMovie() {
        return movie;
    }

    public void setMovie(String movie) {
        this.movie = movie;
    }

    public double getRate() {
        return rate;
    }

    public void setRate(double rate) {
        this.rate = rate;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public int getUid() {
        return uid;
    }

    public void setUid(int uid) {
        this.uid = uid;
    }

    @Override
    public String toString() {
        return "Movie{" +
                "movie='" + movie + '\'' +
                ", rate=" + rate +
                ", timeStamp=" + timeStamp +
                ", uid=" + uid +
                '}';
    }
// 注意写出的顺序和读取的顺序一致

    
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(movie);
        dataOutput.writeDouble(rate);
        dataOutput.writeLong(timeStamp);
        dataOutput.writeInt(uid);

    }

    
    public void readFields(DataInput dataInput) throws IOException {
        this.movie = dataInput.readUTF();
        this.rate = dataInput.readDouble();
        this.timeStamp = dataInput.readLong();
        this.uid = dataInput.readInt();

    }
}
package com._51doit.mr.movie;

import com._51doit.mr.line.LineDemo;
import com._51doit.pojo.MovieWritable;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.IOException;


public class MovieSumRateUid {

    static class MovieSumRateUidMapper extends Mapper<LongWritable, Text, IntWritable, MovieWritable> {
        IntWritable k = new IntWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                MovieWritable mw = JSON.parseObject(line, MovieWritable.class);
                k.set(mw.getUid());
                context.write(k, mw);
            } catch (Exception e) {
              //  e.printStackTrace();
            }

        }
    }

    static class MovieSumRateUidReducer extends Reducer<IntWritable, MovieWritable, IntWritable, DoubleWritable> {
        DoubleWritable v = new DoubleWritable();

        @Override
        protected void reduce(IntWritable key, Iterable<MovieWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0d;
            for (MovieWritable mw : values) {
                sum += mw.getRate();
            }
            v.set(sum);
            context.write(key, v);
        }
    }

    public static void main(String[] args) throws Exception {

        Logger.getLogger("org").setLevel(Level.ERROR);
        Configuration conf = new Configuration();
        // 参数2  job的名字
        Job job = Job.getInstance(conf, new LineDemo().getClass().getSimpleName());

        job.setMapperClass(MovieSumRateUidMapper.class);
        job.setReducerClass(MovieSumRateUidReducer.class);
        // 设置map阶段的输出类型
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(MovieWritable.class);
        // 最终结果的数据类型
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        // job.setNumReduceTasks(2);  //启动3个reduce任务
        // 待处理数据的路径
        FileInputFormat.setInputPaths(job, new Path("D:\\data\\movie\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\data\\movie\\uid_sum_rate"));

        job.waitForCompletion(true);
    }


}


 

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服