Hadoop简介
Hadoop Distributed File System,分布式文件系统
Hadoop应用场景
适合
- 大规模数据
- 流式数据(写一次,读多次)
- 商用硬件(一般硬件)
不适合
- 低延时的数据访问
- 大量的小文件
- 频繁修改文件(基本就是写1次)
Hadoop操作模式
- 本地/独立模式 :在系统中下载Hadoop之后,默认情况下,它以独立模式配置,并且可以作为单个Java进程运行。
- 伪分布式模式 :它是单机上的分布式仿真。每个Hadoop守护进程(如hdfs,yarn,MapReduce等)都将作为单独的java进程运行。此模式对开发有用。
- 完全分布式的模式 :此模式是完全分布式的,至少有两台或多台机器作为集群。
Hadoop架构
- HDFS: 分布式文件存储
- YARN: 分布式资源管理
- MapReduce: 分布式计算
- Others: 利用YARN的资源管理功能实现其他的数据处理方式,如spark
内部各个节点基本都是采用Master-Woker架构
Hadoop HDFS
Hadoop - HDFS
详细架构
Block
- 基本存储单位,一般大小为64M 配置大的块主要是因为: 1) 减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间; 2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录; 3)对数据块进行读写,减少建立网络的连接成本.
- 一个大文件会被拆分成一个个块,存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小
- 每次读写一个块
- 每个块都会被复制到多台机器,默认复制3份
NameNode
- 存储文件的元数据信息metadata,如命名空间信息,块信息等,运行时所有数据都保存到内存(也可以持久化到磁盘上),整个HDFS可存储的文件数受限于NameNode的内存大小
- 一个Block在NameNode中对应一条记录(一般一个block占用NameNode150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件
- 数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)
- NameNode失效则整个HDFS都失效
Secondary NameNode
定时与NameNode进行同步(定期合并文件系统镜像和编辑日志,然后把合并后的传给NameNode,替换其镜像,并清空编辑日志,类似于CheckPoint机制),但NameNode失效后仍需要手工将其设置成主机
fsimage : 它是在NameNode启动时对整个文件系统的快照 edit logs : 它是在NameNode启动后,对文件系统的改动序列.当往DataNode写文件时,DataNode会跟NameNode通信,告诉NameNode什么文件的第几个block放在它那里,NameNode这个时候会将这些元数据信息写到edit logs文件中- Secondary NameNode定时到NameNode去获取edit logs,并更新到Secondary NameNode自己的fsimage上
- 一旦Secondary NameNode有了新的fsimage文件,它将其拷贝回NameNode中。
- NameNode在下次重启时会使用这个新的fsimage文件,从而减少重启的时间。
所以namenode和Secondary namenode需要同样大内存,并且大集群中namenode和Secondary namenode需要是各自独立的两个节点
fs.checkpoint.period
表示多长时间记录一次hdfs的镜像。默认是1小时。fs.checkpoint.size
表示一次记录多大的size,默认64M
如果不使用Secondary NameNode,那么edit logs会保存所有NameNode启动后的改动序列,文件太大,并且fsimage只是NameNode启动时的快照,会很久,Secondary NameNode让备份定期执行,保持更新
Secondary NameNode只是NameNode的一个助手节点,它不是要取代掉NameNode也不是NameNode的备份,见
Checkpoint Node
与Secondary NameNode配置和功能一样,启动命令不同, 启动checkpoint node的命令:bin/hdfs namenode -checkpoint
可以配置多个Checkpoint Node
Backup Node
Backup Node是时刻与NameNode同步的,不需要像Checkpoint Node那样周期性的下载namespace(fsimage, edit logs)来进行合并
Backup Node 会在内存和本地文件系统中对namespace进行备份,所以backup node的内存配置要不低于 NameNode的配置。
启动backup node的命令:bin/hdfs namenode -backup
DataNode
- 保存具体的block数据
- 负责数据的读写操作和复制操作
- DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息
- DataNode之间进行通信,复制数据块,保证数据的冗余性
Hadoop HDFS
HDFS - 写文件
-
客户端将文件写入本地磁盘的临时文件中
-
当临时文件大小达到一个block大小时,HDFS client通知NameNode,申请写入文件
-
NameNode在HDFS的文件系统中创建一个文件,并把该block id和要写入的DataNode的列表(表明不是写入到所有DataNodes中)返回给客户端
-
客户端收到这些信息后,将临时文件写入DataNodes
4.1 客户端将文件内容写入第一个DataNode(一般以4kb为单位进行传输) 4.2 第一个DataNode接收后,将数据写入本地磁盘,同时也传输给第二个DataNode 4.3 依此类推到最后一个DataNode,数据在DataNode之间是通过pipeline的方式进行复制的 4.4 后面的DataNode接收完数据后,都会发送一个确认给前一个DataNode,最终第一个DataNode返回确认给客户端 4.5 当客户端接收到整个block的确认后,会向NameNode发送一个最终的确认信息 4.6 如果写入某个DataNode失败,数据会继续写入其他的DataNode。然后NameNode会找另外一个好的DataNode继续复制,以保证冗余性 4.7 每个block都会有一个校验码,并存放到独立的文件中,以便读的时候来验证其完整性 -
文件写完后(客户端关闭),NameNode提交文件(这时文件才可见,如果提交前,NameNode垮掉,那文件也就丢失了。这里只保证数据的信息写到NameNode上,但并不保证数据已经被写到DataNode中)
HDFS读文件
- 客户端向NameNode发送读取请求
- NameNode返回文件的所有block和这些block所在的DataNodes(包括复制节点)
- 客户端直接从DataNode中读取数据,如果该DataNode读取失败(DataNode失效或校验码不对),则从复制节点中读取
HDFS可靠性
- DataNode可以失效 NameNode就会将该节点的数据(从该节点的复制节点中获取)复制到另外的DataNode中
- 数据可以毁坏 若数据有问题(读取时通过校验码来检测),都可以通过其他的复制节点读取,同时还会再复制一份到健康的节点中
- NameNode不可靠 通过zookeeper实现HA,NameNode的状态大致可分为Active和Standby两种,NameNode竞争在ZooKeeper指定路径上注册临时节点,将自己的host、port、nameserviceId、namenodeId等数据写入节点,哪个NameNode争先写入成功,哪个就成为Active NameNode
HDFS命令工具
fsck: 检查文件完整性
start-blancer.sh:重新平衡HDFS hdfs dfs -copyFromLocal:从本地磁盘复制文件到HDFSHadoop YARN
旧的MapReduce架构存在单点问题和资源利用问题- JobTracker: 负责资源管理,跟踪资源消耗和可用性,作业生命周期管理(调度作业任务,跟踪进度,为任务提供容错)
- TaskTracker: 加载或关闭任务,定时报告任务状态
- JobTracker是MapReduce的集中处理点,存在单点故障
- JobTracker完成了太多的任务,造成了过多的资源消耗,当MapReduce job 非常多的时候,会造成很大的内存开销。
- 在TaskTracker端,以map/reduce task的数目作为资源的表示过于简单,没有考虑到cpu/ 内存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现OOM
- 在TaskTracker端,把资源强制划分为map task slot和reduce task slot, 如果当系统中只有map task或者只有reduce task的时候,会造成资源的浪费,即集群资源利用的问题
YARN架构
YARN就是将JobTracker的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理(ResourceManager)和一个作业的管理(ApplicationMaster)- ResourceManager: 全局资源管理和任务调度
- 调度器(Scheduler) 调度器是一个可插拔的插件,可以根据容量、队列等限制条件,将系统中的资源分配给各个正在运行的应用程序。调度器是一个“纯调度器”,仅根据各个应用程序的资源需求进行资源分配,不再从事任何与具体应用程序相关的工作,这些均交由应用程序相关的ApplicationMaster完成
- 应用程序管理器(Applications Manager,ASM) 应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等
- NodeManager: 单个节点的资源管理和监控
- 定时地向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态
- 处理来自ApplicationMaster的Container启动/停止等各种请求
- ApplicationMaster: 单个作业的资源管理和任务监控 用户提交的每个应用程序均包含1个AM,主要功能包括:
- 与ResourceManager调度器Scheduler协商以获取资源(用Container表示)
- 将得到的任务进一步分配给内部的任务
- 与NodeManager通信以启动/停止任务
- 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务
- Container: 资源申请的单位和任务运行的容器 是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等. 当ApplicationMaster向ResourceManager申请资源时,ResourceManager调度器为ApplicationMaster返回的资源便是用Container表示的,任务只能使用该Container中描述的资源 YARN架构下形成了一个通用的资源管理平台和一个通用的应用计算平台,避免了旧架构的单点问题和资源利用率问题,同时也让在其上运行的应用不再局限于MapReduce形式
YARN基本流程
- 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
- ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
- ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复4~7。
- ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
- 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
- NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
- 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。 在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
- 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。
并行角度理解YARN
可将YARN看做一个云操作系统,它负责为应用程序启动ApplicationMaster(相当于主线程),然后再由ApplicationMaster负责数据切分、任务分配、启动和监控等工作,而由ApplicationMaster启动的各个Task(相当于子线程)仅负责自己的计算任务。当所有任务计算完成后,ApplicationMaster认为应用程序运行完成,然后退出。
Hadoop ResourceManager
负责全局的资源管理和任务调度,把整个集群当做计算资源池,只关注分配,不管应用,且不负责容错(容错是ApplicationMaster负责)
资源管理
- 动态分配Container资源
- 用户提交作业到ResourceManager,然后在某个NodeManager上分配一个Container来运行ApplicationMaster,ApplicationMaster再根据自身程序需要向ResourceManager申请资源
- YARN有一套Container的生命周期管理机制,而ApplicationMaster和其Container之间的管理是应用程序自己定义的
任务调度
- 只关注资源的使用情况,根据需求合理分配资源
- Scheluer可以根据申请的需要,在特定的机器上申请特定的资源(ApplicationMaster负责申请资源时的数据本地化的考虑,ResourceManager将尽量满足其申请需求,在指定的机器上分配Container,从而减少数据移动)
内部结构
- Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)
- Adaminstration Service: 队列、节点、Client权限管理
- ApplicationMasterService: 注册、终止ApplicationMaster, 获取ApplicationMaster的资源申请或取消的请求,并将其异步地传给Scheduler, 单线程处理
- ApplicationMaster Liveliness Monitor: 接收ApplicationMaster的心跳消息,如果某个ApplicationMaster在一定时间内没有发送心跳,则任务失效,其资源将会被回收,然后ResourceManager会重新分配一个ApplicationMaster运行该应用(默认尝试2次)
ApplicationMaster为节点内部进程,和ApplicationMaster Liveliness Monitor不一样
- Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息
- NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的Container都标记成无效,也不会调度任务到该节点运行
和Resource Tracker Service区别?
- ApplicationManager: 管理应用程序,记录和管理已完成的应用
- ApplicationMaster Launcher: 一个应用提交后,负责与NodeManager交互,分配Container并加载ApplicationMaster,也负责终止或销毁
- YarnScheduler: 资源调度分配, 有FIFO(with Priority),Fair,Capacity方式
- ContainerAllocationExpirer: 管理已分配但没有启用的Container,超过一定时间则将其回收
Hadoop NodeManager
Node节点下的Container管理
- 启动时向ResourceManager注册并定时发送心跳消息,等待ResourceManager的指令
- 监控Container的运行,维护Container的生命周期,监控Container的资源使用情况
- 启动或停止Container,管理任务运行时的依赖包(根据ApplicationMaster的需要,启动Container之前将需要的程序及其依赖包、配置文件等拷贝到本地)
内部结构
- NodeStatusUpdater: 启动向ResourceManager注册,报告该节点的可用资源情况,通信的端口和后续状态的维护
- ContainerManager: 接收RPC请求(启动、停止),资源本地化(下载应用需要的资源到本地,根据需要共享这些资源) PUBLIC: /filecache PRIVATE: /usercache//filecache APPLICATION: /usercache//appcache//(在程序完成后会被删除)
- ContainersLauncher: 加载或终止Container
- ContainerMonitor: 监控Container的运行和资源使用情况
- ContainerExecutor: 和底层操作系统交互,加载要运行的程序
Hadoop ApplicationMaster
单个作业的资源管理和任务监控
- 计算应用的资源需求,资源可以是静态或动态计算的,静态的一般是Client申请时就指定了,动态则需要ApplicationMaster根据应用的运行状态来决定
- 根据数据来申请对应位置的资源(Data Locality)
- 向ResourceManager申请资源,与NodeManager交互进行程序的运行和监控,监控申请的资源的使用情况,监控作业进度
- 跟踪任务状态和进度,定时向ResourceManager发送心跳消息,报告资源的使用情况和应用的进度信息
- 负责本作业内的任务的容错
计算资源需求方式
一般的MapReduce是根据block数量来定Map和Reduce的计算数量,一般的Map或Reduce就占用一个Container
Hadoop Container
- 基本的资源单位(CPU、内存等)
- Container可以加载任意程序,而且不限于Java
- 一Node可以包含多个Container,也可以是一个大的Container
- ApplicationMaster可以根据需要,动态申请和释放Container
Hadoop Failover
失败类型
- 程序问题
- 进程崩溃
- 硬件问题
失败处理方式
-
任务失败
- 运行时异常或者JVM退出都会报告给ApplicationMaster
- 通过心跳来检查挂住的任务(timeout),会检查多次(可配置)才判断该任务是否失效
- 一个作业的任务失败率超过配置,则认为该作业失败
- 失败的任务或作业都会由ApplicationMaster重新运行
-
ApplicationMaster失败
- ApplicationMaster定时发送心跳信号到ResourceManager,通常一旦ApplicationMaster失败,则认为失败,但也可以通过配置多次后才失败
- 一旦ApplicationMaster失败,ResourceManager会启动一个新的ApplicationMaster
- 新的ApplicationMaster负责恢复之前错误的ApplicationMaster的状态(
yarn.app.mapreduce.am.job.recovery.enable=true
),这一步是通过将应用运行状态保存到共享的存储上来实现的,ResourceManager不会负责任务状态的保存和恢复 - Client也会定时向ApplicationMaster查询进度和状态,一旦发现其失败,则向ResouceManager询问新的ApplicationMaster
-
NodeManager失败
- NodeManager定时发送心跳到ResourceManager,如果超过一段时间没有收到心跳消息,ResourceManager就会将其移除
- 任何运行在该NodeManager上的任务和ApplicationMaster都会在其他NodeManager上进行恢复
- 如果某个NodeManager失败的次数太多,ApplicationMaster会将其加入黑名单,任务调度时不在其上运行任务
只是针对这个作业加入黑名单(ApplicationMaster只负责一个作业,ResouceManager没有做记录)
-
ResourceManager失败
- 通过checkpoint机制(Secondary NameNode/backup node ),定时将其状态保存到磁盘,然后失败的时候,重新运行
- 通过zookeeper同步状态和实现透明的HA(High Available高可用性集群)
一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和zookeeper来实现HA
Hadoop MapReduce
MapReduce - 读取数据
通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,RecordReader读取InputSplit的内容给Map,每个InputSplit对应一个Map处理
InputFormat
决定读取数据的格式,可以是文件或数据库等
功能
- 验证作业输入的正确性,如格式等
- 将输入文件切割成逻辑分片(InputSplit)
- 提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用
InputSplit
代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法,一个InputSplit给一个单独的Map处理
public abstract class InputSplit { /** * 获取Split的大小,支持根据size对InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取存储该分片的数据所在的节点位置. */ public abstract String[] getLocations() throws IOException, InterruptedException;}复制代码
tips
- 处理大量小文件 合并小文件,CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)
- 计算InputSplit方式
- 通常一个InputSplit就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度
- 通过
mapred.min.split.size, mapred.max.split.size, block.size
来控制拆分的大小- 若
mapred.min.split.size > block size
,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取 - 若
mapred.max.split.size < block size
,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算;且上报结果,关闭当前计算打开新的split均需要耗费资源)
- 若
- 先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分
splitSize = computeSplitSize(blockSize, minSize, maxSize)
,默认splitSize 就等于blockSize的默认值(64m)
- 处理分片间的数据 split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split 解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽掉第一条记录
Hadoop Mapper
主要是读取InputSplit的每一个Key,Value对并进行处理
Hadoop Shuffle
Shuffle对Map的结果进行排序并传输到Reduce进行处理
Map的结果并不直接存放到硬盘,而是利用缓存做一些预排序处理, Map会调用Combiner,压缩,按key进行分区、排序等,尽量减少结果的大小,每个Map完成后都会通知Task,然后Reduce就可以进行处理map端
- 不直接写文件, 利用缓存做预处理
- 每个map任务默认缓存区100M,80%时后台线程开始写入文件, 若缓冲区已满,则map任务需要等待
- 每次缓存过阈值时都会创建文件写入,可能会创建大量文件,最后需要合并文件排序
- 可以配置写入压缩文件,减少数据传输
- map完成则通知任务管理器,此时reduce开始复制结果数据
reduce端
- 如果Map的结果很少,则直接放到内存,否则写入本地硬盘文件中
- 当所有的Map结果都被复制和合并后,就会调用Reduce方法
- Reduce结果会写入到HDFS中
调优
- 保证Map,Reduce任务有足够的内存
- 给shuffle分配尽可能多的内存
- 对Map,避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值
- 对Reduce,把Map的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘
默认情况下,所有的内存都是分配给Reduce方法的,如果Reduce方法不怎么消耗内存,可以mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0
- 在任务监控中可通过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的
- 对于IO方面,Map的结果可以使用压缩,同时增大buffer size(
io.file.buffer.size
,默认4kb)
主要就是增大内存,减少IO
配置
属性 | 默认值 | 描述 |
---|---|---|
io.sort.mb | 100 | map输出分类时所使用缓冲区的大小 |
io.sort.spill.percent | 0.80 | 针对map输出内存缓冲和记录索引的阈值使用比例 |
io.sort.factor | 10 | 文件分类时合并流的最大数量。此属性也用于reduce。通常把数字设为100. |
mapred.compress.map.output | false | 压缩映射输出 |
mapred.map.output.compression.codec | DefaultCodec | map输出所需的压缩解编码器. |
mapred.reduce.parallel.copies | 5 | 用于向reducer传送map输出的线程数目 |
mapred.reduce.copy.backoff | 300 | 时间的最大数量,以秒为单位,这段时间内若reducer失败则会反复尝试传输 |
io.sort.factor | 10 | 组合运行所需最大溢出文件数目 |
min.num.spills.for.combine | 3 | 组合运行所需最小溢出文件数目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 随机复制阶段映射输出缓冲器的堆栈大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用于启动合并输出进程和磁盘传输的映射输出缓冲器的阀值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用于启动合并输出和磁盘传输进程的映射输出的阀值数目。小于等于0意味着没有门槛,而溢出行为由 mapred.job.shuffle.merge.percent单独管理 |
mapred.job.reduce.input.buffer.percent | 0.0 | 用于减少内存映射输出的堆栈大小比例,内存中映射大小不得超出此值。 |
Hadoop IO
Hadoop - IO
压缩
压缩和拆分一般是冲突的,压缩后的文件的block是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时Map任务就无法处理,所以对于这些压缩,Hadoop往往是直接使用一个Map任务处理整个文件的分析
Map的输出结果也可以进行压缩,这样可以减少Map结果到Reduce的传输的数据量,加快传输速率完整性
- 写数据验证并保存校验码
- 读数据验证校验码
- dataNode定时检查每个block完整性,当发现block数据有问题时,先去NameNode找备份数据进行恢复,恢复失败才报错
Hadoop 配置
配置文件
文件分类
- xxx-default.xml:只读,默认的配置
- xxx-site.xml:替换default中的配置
- core-site.xml 配置公共属性
- hdfs-site.xml 配置HDFS
- yarn-site.xml 配置YARN
- mapred-site.xml 配置MapReduce
配置文件优先级
- 代码中/运行时指定
- 客户机xxx-site.xml
- slave节点上的xxx-site.xml
- xxx-default.xml 若某个属性不想被覆盖,可以设置成final
复制代码 {PROPERTY_NAME} {PROPERTY_VALUE} true