使用Map-Reduce求各年销售笔数,各年销售总额

   日期:2020-11-03     浏览:110    评论:0    
核心提示:python+streaming实验:求各年销售笔数,各年销售总额实验环境:Linux下用map\reduce处理数据集实验要求:一、创建map.py文件,注意要点:1、对空白行的处理,可判断 line[0] != ‘’2、时间数据包含年月日,只需截取年份3、可以将每行订单数据都算为1笔4、目标输出格式:年份,数额二、自行编写reduce.py代码,注意要点1、可以将数据的行数当成笔数2、输出时应按年份排好序3、目标输出格式: 年份,笔数-总额(总额要求转化成k且保留两位小数,如13

实验环境:Linux环境、python3环境

实验要求:

一、创建map.py文件,注意要点:

1、对空白行的处理,可判断 line[0] != ‘’

2、时间数据包含年月日,只需截取年份

3、可以将每行订单数据都算为1笔

4、目标输出格式:年份,数额

二、自行编写reduce.py代码,注意要点

1、可以将数据的行数当成笔数

2、输出时应按年份排好序

3、目标输出格式: 年份,笔数-总额

(总额要求转化成k且保留两位小数,如13572468.98 -> 13572.46k)

实验思想:

1、map环节:映射,即选出所需要处理的字段。从这道题的实验要求来看,map最后处理出来的键值对应为:(k1,v1)=(年份,数额)
2、reduce环节:规约,将关键字相同的数据进行运算处理。reduce最后处理出来的键值对为:(k2,v2)=(年份,(笔数—总额))

实现功能环节

map功能的实现

#map功能的实现
import sys
   
for line in sys.stdin:
           if line[0]!="" :
            col = line.strip().split(',')
            col1=col[2].strip().split('-')
            col[2]=col1[0]
            print(col[2],',',col[6])

reduce功能的实现

#reduce功能的实现
#!/bin/env python
# encoding: utf-8
from operator import itemgetter
import itertools
import sys


def read_mapper_output(file, separator = ','):
    for line in file:
       yield line.rstrip().split(separator,1)
stdin_generator=read_mapper_output(sys.stdin, ',')
for year, sals in itertools.groupby(stdin_generator,itemgetter(0) ):
  count=0
  total_sal=0
  for year,cur_sal in sals:
      count = count+1
      total_sal=total_sal + float(cur_sal)
  print(year,'\t',count,'\t','%.2f'%(total_sal/1000))

运行结果

map的运行结果

本地管道测试map代码

cat sales.csv | python map.py 

map-reduce的运行结果

map-reduce本地管道测试:

cat sales.csv | python map.py | sort -k 1 | python reduce.py

在Hadoop集群里实现MapReduce功能

一、打开命令行窗口,启动Hadoop

cd /opt/hadoop/sbin
hadoop namenode -format #格式化名称节点
start-all.sh #启动服务器
jps #查看进程

二、在HDFS上创建目录/001/input,并将数据文件上传至HDFS

hdfs dfs -mkdir -p /001/input #创建文件夹

hdfs dfs -put sales.csv /001/input #上传文件到集群文件夹里
hdfs dfs -ls /001/input #查看是否上传成功

三、新建一个XX.sh文件

$HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-D stream.non.zero.exit.is.failure=false \ #对于错误不报出
-D mapred.job.name="streaming_count" \ #作业的名称的命名,在浏览器输入:localhost:8088/,可以看到作业名。
-D mapred.job.priority=HIGH \ #先执行优先权高的作业 
-files "/home/ubuntu/PycharmProjects/untitled/map.py,/home/ubuntu/PycharmProjects/untitled/reduce.py" \
-input /001/input/sales.csv \
-output /001/input/out001 \
-mapper "python3 map.py" \
-reducer "python3 reduce.py" 

然后在XX.sh文件夹下打开命令窗口输入以下代码即可以运行出结果:

sh XX.sh

用上面指令运行出来的结果如下:

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服