MR简单总结

中台 数据  收藏
0 / 245

一、Hive 执行顺序

执行顺序:

from … where … select … group by … having … order by …limit

其实总结hive的执行顺序也是总结mapreduce的执行顺序:

MR程序的执行顺序:

map阶段:

1.执行from加载,进行表的查找与加载

2.执行where过滤,进行条件过滤与筛选

3.执行select查询:进行输出项的筛选

4.执行group by分组:描述了分组后需要计算的函数

5.map端文件合并:map端本地溢出写文件的合并操作,每个map最终形成一个临时文件。 然后按列映射到对应的reduceReduce阶段:

Reduce阶段:

1.group by:对map端发送过来的数据进行分组并进行计算。

2.select:最后过滤列用于输出结果

3.limit排序后进行结果输出到HDFS文件

二、MapReduce过程详解

过程图.png

2.1Map 阶段:

1.获取待处理的文件和目录->

2.split(切片):AM(applicationmaster) 会计算job 和 maptask的数量



1)JOB数计算

a.一般情况下,HQL中的每一个表或者子查询都会生成一个job,这是逻辑执行计划中生成的,如果一个子查询join另外一个子查询,会产生4个JOB,但后面Hive还会优化,比如:使用MapJoin,最终一条HQL语句生成的job数量很难通过HQL观察出来。 

b.hive里,同一sql里,会涉及到n个job,默认情况下,每个job是顺序执行的。 如果每个job没有前后依赖关系,可以并发执行的话,可以通过设置该参数 set hive.exec.parallel=true,实现job并发执行,该参数默认可以并发执行的job数为8。

2)Maptask数计算,合理设置Map数有利于提升性能

    a. default_num = total_size/block_size,block_size默认256
    b.可以通过set mapred.map.tasks =num,但是这个数量仅在大于default_num 的时候才会生效。
    c. 可以通过set mapred.min.split.size来设置每个task的文件大小,但是这个数量在大于block_size的时候才会生效。
        split_size = max(mapred.min.split.size,block_size);
        split_num = total_size/split_size

d.减少mapper数 ,合并小文件

    -- 每个Map最大输入大小set mapred.max.split.size = 128000000;

    -- 每个Map最小输入大小set mapred.min.split.size = 100000000;

    -- 执行Map前进行小文件合并set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。

e.增加mapper数

--设置reduce的数量 set mapred.reduce.tasks=10; 

--每个reduce处理的数据量,默认1GB set hive.exec.reducers.bytes.per.reducer 

3.Map函数:数据读取出来 以K,V键值对方式存在Map函数中

4.partition:对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得(计算需要reduce的个数)

5.kvbuffer(环形缓冲区):通过outputCollector接口写入到环形缓冲区中,k,v映射关系就被序列化了,环形缓冲区中存了两部分内容 数据+索引 set mapreduce.task.io.sort.mb=1024(设置Mapper中的Kvbuffer的大小,默认100M,调大的话,会减少磁盘spill的次数),

       同时会进行分区和区内排序操作,此时还可以设置combiner 参数(mapreduce.map.combine.minspills(default 3)) 用于合并,减少写入磁盘数据(例如将<a,1>,<a,1> 数据合并为 <a,2> 再传递给reduce 计算

       作用:因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,并做一些预排序以提高效率

6.spill: 环形缓冲区空间达到80%的时候就会开始溢写(因为spill的时候还有可能别的线程在往里写数据,因为还预留空间,有可能有正在写到Buffer中的数据),但是这个大小是可以根据job提交时的参数设定来调整的.该参数即为:mapreduce.task.io.sort.mb

7.Merge: splill 一直在不断的溢写,为了将相同区分的数据尽量放在一起运算,需要进行Merge合并操作(此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort))

      mapreduce.task.io.sort.factor(default:10),代表进行merge的时候最多能同时merge多少spill,如果有100个spill个文件,此时就无法一次完成整个merge的过程,这个时候需要调大mapreduce.task.io.sort.factor(default:10)来减少merge的次数,从而减少磁盘的操作;

2.2Map端shuffle

①分区partition

②写入环形内存缓冲区

③执行溢出写:排序sort--->合并combiner--->生成溢出写文件

④归并merge

合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1><“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>

map端相关的属性如下表:

map-cs.png

2.3Reduce 阶段:

1.只要有一个Maptast 完成 AM 会告知一个 reducetask 需要从哪些 maptask 中获取自己分区需要归并处理的数据

     reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5)调整

   1)Reduce 数计算

    a.reducers = min(maxReducers, totalInputFileSize/bytesPerReducer)

    b.maxReducers由参数hive.exec.reducers.max设置,默认50,     bytesPerReducer由参数hive.exec.reducers.bytes.per.reducer 设置,默认1GB

    c.可以通过 set mapreduce.reduce.tasks=10; 设置reducer数量

    d.可以调整hive.exec.reducers.bytes.per.reducer参数的值;set hive.exec.reducers.bytes.per.reducer=500000000;

    e.**reduce端小文件合并设置 **set hive.merge.mapredfiles=true;

     如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务

2.Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量(70%)的时候才spill磁盘,这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置

     一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块

3.Merge 归并:

每一轮合并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factor(default:10)来配置

2.4Reduce端shuffle

①复制copy

②归并merge

③reduce

reduce端相关属性:

reducr-tp.png

三、JOIN 工作原理:

mapjoin

Mapjoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要启动Reduce任务也就不需要经过shuffle阶段,从而能在一定程度上节省资源提高JOIN效率。

开启map join: set hive.auto.convert.join = true;

reducejoin

1)map:用连接字段作为key,其余部分和新加的标志作为value,最后进行输出,reduce 根据分区去拉取数据,进行合并操作

2) reduce:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就可以了

 并且这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜

Hive Common Join

四、HiveSql 执行计划查看

1.explain select * from dwb.dwb_trde_order_item_s_d t where t.pt=20191127

image2019-12-10-21_44_39.png

2.explain select * from dwb.dwb_trde_order_item_s_d t join dim.dim_prod_item t1 ON t.iid=t1.item_id where t.pt=20191127 and t1.pt=20191127 and t.plat_form=9

image2019-12-10-21_40_43.png

五、基本调优原则

1.尽量尽早地过滤数据,减少每个阶段的数据量,需要过滤的条件添加在子句中,同时选择需要使用到的字段,不要写select * from

`select` `sum``(num) `

  `from` `dwb.dwb_trde_order_item_s_d   t`

  `join` `dim.dim_prod_item t1  ``ON` `t.iid=t1.item_id `

 `where` `t.pt=20191127`

  `and`  `t1.pt=20191127  `

  `and` `t.plat_form=9`

更改为:

`select` `sum``(num) `

 `from`

    `(``select` `iid`

           `,num  `

      `from` `dwb.dwb_trde_order_item_s_d t `

     `where` `pt=20191127`

       `and` `plat_form=9`

    `)t`

 `join`

    `(``select` `item_id`

       `from` `dim.dim_prod_item t`

      `where`  `pt=20191127`

    `) t1  `

  `on` `t.iid=t1.item_id `

第二种方式产生的map数比第一种少,执行时间短

  1. join操作 小表要注意放在join的左边。否则会引起磁盘和内存的大量消耗

map端join 将小表加载到内存,减少shuffer过程,磁盘和内存的消耗

`select`    `/*+mapjoin(t,t2,t3)*/`

                  `t3.id         ``as` `cate_id                                                   ``-- '类目ID(叶子类目)'`

                 `,t3.``name`       `as` `cate_name                                                 ``-- '类目名称'`

                 `,t2.id         ``as` `parent_id                                                 ``-- '父类目ID'                                          `

          `from` `(``select` `id`

                       `,``name`

                       `,``CASE` `WHEN` `id ``IN` `(1230,449,2,6,1472,562,571,1454,1485,1552,1455)`

                             `THEN` `0`

                             `ELSE` `1`

                         `END` `AS` `is_standard  `

                       `,``CASE` `WHEN` `id ``IN` `(310,2318,2319)  ``THEN` `0  ``ELSE` `1 ``END` `AS` `is_coop`

                `from`  `ods.ods_bb09_category_s_d`

                `where` `pt=``'20191209'`

                  `and` `parent_id=0`

                  `and` `status>0`

                `) t`

         

           `join`

                `(``select`  `id`

                         `,``name`

                         `,parent_id`

                   `from` `ods.ods_bb09_category_s_d`

                  `where` `pt=``'20191209'`

                    `and` `status>0`

                `) t2`

             `on` `t.id=t2.parent_id`

         

          `join`

               `(``select`  `id`

                       `,``name`

                       `,parent_id`

                       `,status`

                       `,gmt_create `

                       `,gmt_modified `

                  `from` `ods.ods_bb09_category_s_d`

                 `where` `pt=``'20191209'`

                   `and` `status>0`

               `) t3`

            `on` `t2.id=t3.parent_id`

只会产生一个map阶段 而没有reduce阶段

3.如果是同一张表的不同条件,使用 union all 代替 多次insert ,因为同一张表只会扫一次,产生一个job,insert 多次会产生多个job,每个job又会有不同的MR计划

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=最大并发job数; 

对于同一个SQL产生的JOB,如果不存在依赖的情况下,将会并行启动JOB

select a,b,c
from
(
select a1 as a,b1 as b,c1 as c from table1
union all
select a2 as a,b2 as b,c2 as c from table2
union all
select a3 as a,b3 as b,c3 as c from table3
...
)t;

dim_prod_cate 商品类目的存在join的合并,可以参考一下

4.使用GROUP BY 代替 COUNT DISTINCT ,数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替

SELECT iid,
       COUNT(DISTINCT uid) AS uv
 FROM dwb.dwb_trde_order_item_s_d t 
where pt=20191127
GROUP BY iid

更改为:

SELECT iid,
       COUNT(uid) AS uv
FROM (select iid
            ,uid
      from dwb.dwb_trde_order_item_s_d t 
     where pt=20191127
      group by iid
              ,uid
     ) a
GROUP BY iid;

5.join 关联条件key 一定要是同一个类型,否则会出现类型的转化

explain select sum(num) 
  from dwb.dwb_trde_order_item_s_d   t
  join dim.dim_prod_item t1  ON t.iid=t1.item_name
 where t.pt=20191127
  and  t1.pt=20191127  
  and t.plat_form=9

查看日志进行了一个double 类型的转化,消耗资源

6.较少逻辑的复杂度,尽量不要多张大表去关联

select  *
  from  a
  join b
   on a.id=b.id
  join c
   on a.id=c.id
  
更改为
临时表A
select *
  from a
  join b
    on a.id=b.id
  
临时表B
select *
  from A
  join c
    on a.id=c.id

7.多个表关联,相同key关联条件放在一起,减少job数

select  *
  from  a
  left join b
    on a.id=b.id
  left join c
    on a.id=c.id
 left join d
    on a.id=d.id
  left join e
    on a.s_id=d.s_id
 
更改为
select *  from
(select a.id,a.s_id
   from  a
  left join b
    on a.id=b.id
  left join c
    on a.id=c.id
 left join d
    on a.id=d.id
)aleft join e
 on a.s_id=d.s_id 减少执行计划中job的数量

8.增加map/reduce数,提高并发度,可以加快计算速度

调节map和reduce内存大小

set mapreduce.map.memory.mb=10240;
set mapreduce.reduce.memory.mb=10240;
set mapred.map.child.java.opts=-server -Xmx9000m -Djava.net.preferIPv4Stack=true;
set mapred.reduce.child.java.opts=-server -Xmx9000m -Djava.net.preferIPv4Stack=true;

9.combiner

参数mapreduce.map.combine.minspills(default 3) Combiner存在的时候,此时会根据Combiner定义的函数对map的结果进行合并,什么时候进行Combiner操作呢???和Map在一个JVM中,是由min.num.spill.for.combine的参数决定的,默认是3,也就是说spill的文件数在默认情况下由三个的时候就要进行combine操作,最终减少磁盘数据 相当于提前进行了reduce操作

10.compress(压缩)

compress(压缩):减少磁盘IO和网络IO还可以进行: 对spill,merge文件都可以进行压缩。中间结果非常的大,IO成为瓶颈的时候压缩就非常有用,可以通过mapreduce.map.output.compress(default:false)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压,在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU,压缩一般可以10倍的减少IO操作,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)参数设置。但这个过程会消耗CPU,适合IO瓶颈比较大。

11.数据倾斜,

1.数据分布原因: 1). 数据频率倾斜——某一个区域的数据量要远远大于其他区域。 2). 数据大小倾斜——部分记录的大小远远大于平均值。

2.带有 DISTICT ,GROUP BY ,JOIN 都有可能出现数据倾斜

JOIN引起的数据倾斜 情形1:其中一个表较小,但是key集中 后果1:分发到某一个或几个Reduce上的数据远高于平均值

情形2:大表与大表,但是分桶的判断字段0值或空值过多 后果2:这些空值都由一个reduce处理,非常慢

情形3:关联条件key值类型不同 后果3:导致一直在计算转化工作

表现:任务进度长时间维持在99%,有少量(1个或几个)reduce子任务未完成。 参数:set hive.optimize.skewjoin=true;

DISTICT引起的数据倾斜 情形:某特殊值过多,使用distinct 函数,数据会shuffer到一个reducer中,导致reducer数据严重倾斜 后果:处理此特殊值的reduce耗时 方法:使用group by 先进行一次分组,在 进行count操作

GROUP BY引起的数据倾斜 情形:group by 维度过小,某值的数量过多 后果:处理某值的reduce非常耗时 参数:set hive.groupby.skewindata=true

解决方案调优参数

set hive.map.aggr=true 在map中会做部分聚集操作,效率更高但需要更多的内存

set hive.groupby.skewindata=true;

能先进行 group 操作的时候先进行 group 操作,把 key 先进行一次 reduce,之后再进行 count 或者 distinct count 操作。

出现大量空等特殊值,设置rank() 等默认值处理或者空值需要单独处理(),再进行关联

增加reduce 的jvm内存:set mapreduce.map.java.opts=-Xmx8196m;set mapreduce.reduce.java.opts=-Xmx8196m;

增加reduce个数:set hive.exec.reducers.max=400;