浅析HDFS基本原理

随着互联网的发展,数据日益增多,增长超过了单机能够处理的上线,数据如何存储和处理成为了科技公司的难题,随着google的三篇论文的发布,大家终于找到了一个方案-分布式文件系统+MapReduce。Hadoop是参考google论文实现的,集成了分布式文件系统与分布式批处理平台。hadoop的设计目标是用来解决大文件海量存储和批处理的,为了避免单个节点故障导致数据丢失,设计副本冗余机制。 本文将主要分析一下几个方面:

  • HDFS的概念与架构
  • NameNode的HA机制
  • 读写流程分析
  • 使用场景与缺点

HDFS的概念与架构

HDFS采用的master/slave架构。一个HDFS集群通常由一个Active的NameNode和若干DataNode组成,为了避免NameNode单点问题,通常会做一个NameNode的standby作为备份。在整个hdfs涉及到许多的核心概念,下面做一个简单介绍

  • NameNode: NameNode是一个中心服务器,负责管理文件系统的名字空间以及客户端的访问,比如文件的打卡、关闭、重命名文件或者目录。它负责确定数据块到具体的存储节点的映射。在其同意调度下进行数据块的创建、删除、复制。
  • DataNode: DataNode是HDFS的实际存储节点,负责管理它所在节点的存储;客户端的读写请求。并且定期上报心跳和块的存储位置。
  • Block: HDFS上文件,从其内部看,一个文件其实是被分成一个或者多个数据块存储的,这些数据块存储在一组DataNode上。
  • Edits: 在HDFS发起的创建、删除等操作其实是一个事物,事物在NameNode上以Edit对象存储在edits文件中,持久化在NameNode的本地磁盘上。
  • FSimage: FSimage是NameNode的元数据存储快照,持久化在NameNode的本地磁盘上。

当NameNode重启的时候,NameNode从FSImage和Edits文件中读取数据,加载到内存中。

在HDFS体系来看,NameNode主要负责元数据的存储与操作,DataNode负责实际的存储。DataNode通常在一个机器上部署一个进程,这些机器分布式在多个机架上。整体架构如下图所示:

客户端操作HDFS上的文件时,向NameNode获取文件的元数据信息,NameNode返回文件的块存储位置,Client选择块存储位置最近的节点进行块操作。通常优先级是本机>本机柜>其他机柜的节点。数据块的分布式通常是在同一机架的两个节点存储两份,为了避免单个机架的故障导致数据块丢失,会选择在另外一个机架上的节点存储一份。如果把数据存储在三个不同的机架上,由于不同机架之间通过交换机进行数据交换,网络速度会比单机架慢,因此复制数据也会慢,此外,还会增加机架之间的交换机的压力。数据块分布如下图

DataNode除了负责客户端的读写操作外,还需要定期的向NameNode的active和standby做心跳汇报,如果有DataNode的心跳异常,被确定为死的节点,NameNode将会对存储在该节点的数据进行复制,保证数据块的数据块的副本数。DataNode除了心跳还会将本节点的数据块上报给NameNode的active和standby。

NameNode的HA机制

在hadoop 1.x的时候,NameNode存储单点问题,导致集群故障,成为集群的瓶颈。在hadoop 2.x之后,增加了NameNode的HA机制。对于NameNode单点问题,大家首先想到的解决方案是给NameNode做一些备份,但是难点是在于如何保证NameNode和它的备份节点的数据一致性问题。在分布式环境下要保证数据一致性问题,需要考虑的问题很多,比如脑裂,分区容错,一致性协议等等。下面一起看看Hadoop是如何解决的。

HA的架构

上图是Hadoop的2.x版本提供的NameNode的HA机制的架构。在这个架构体系涉及到Zookeeper,NameNode Active和NameNode Standby和共享存储空间,ZKFC。在整个高可用架构中,Active NameNode和Standby NameNode两台NameNode形成互备,一台处于Active状态,作为主节点,另外一台是Standby状态,作为备节点,只有主节点才能对外提供读写服务。

ZKFC(ZKFailoverContoller)作为独立的进程运行,对NameNode的主备切换进行总体控制。ZKFC能够及时加测到NameNode的健康状况,在active的NameNode故障的时候,借助Zookeeper实现自动主备选举和切换。当然,NameNode目前也支持不依赖Zookeeper的手动主备切换。Zookeeper集群主要是为控制器提供主被选举支持。

共享存储系统是NameNode实现高可用的关键部分,共享存储系统保存了NameNode运行过程中的所有产生的HDFS的元数据。active NameNode和standby NameNode通过共享存储系统实现元数据同步。在主备切换的时候,新的active NameNode在确认元数据同步之后才能继续对外提供服务。

除了通过共享存储系统共享HDFS的元数据信息之外,active NameNode和 standby NameNode还需要共享HDFS的数据块和DataNode之间的映射关系,DataNode会同时向active NameNode和standby NameNode上报数据块位置信息。

基于QJM的共享存储系统

共享存储系统主要是由多个JournalNode进程组成,JournalNode由奇数个组成。当Active NameNode中有事物提交,active NameNode会将editLog发给jouranlNode集群,journalNode集群通过paxos协议保证数据一致性(即:超过一半以上的jounalNode节点确认),这个数据完成了提交到共享存储。standby NameNode定期从journalNode读取editLog,合并到自己的fsimage上。总体的架构如下:

处于 Standby 状态的 NameNode 转换为 Active 状态的时候,有可能上一个 Active NameNode 发生了异常退出,那么 JournalNode 集群中各个 JournalNode 上的 EditLog 就可能会处于不一致的状态,所以首先要做的事情就是让 JournalNode 集群中各个节点上的 EditLog 恢复为一致。另外如前所述,当前处于 Standby 状态的 NameNode 的内存中的文件系统镜像有很大的可能是落后于旧的 Active NameNode 的,所以在 JournalNode 集群中各个节点上的 EditLog 达成一致之后,接下来要做的事情就是从 JournalNode 集群上补齐落后的 EditLog。只有在这两步完成之后,当前新的 Active NameNode 才能安全地对外提供服务。

基于QJM的共享存储系统内部实现

FSEditLog:这个类封装了对 EditLog 的所有操作,是 NameNode 对 EditLog 的所有操作的入口。

JournalSet: 这个类封装了对本地磁盘和 JournalNode 集群上的 EditLog 的操作,内部包含了两类 JournalManager,一类为 FileJournalManager,用于实现对本地磁盘上 EditLog 的操作。一类为 QuorumJournalManager,用于实现对 JournalNode 集群上共享目录的 EditLog 的操作。FSEditLog 只会调用 JournalSet 的相关方法,而不会直接使用 FileJournalManager 和 QuorumJournalManager。

FileJournalManager:封装了对本地磁盘上的 EditLog 文件的操作,不仅 NameNode 在向本地磁盘上写入 EditLog 的时候使用 FileJournalManager,JournalNode 在向本地磁盘写入 EditLog 的时候也复用了 FileJournalManager 的代码和逻辑。

QuorumJournalManager:封装了对 JournalNode 集群上的 EditLog 的操作,它会根据 JournalNode 集群的 URI 创建负责与 JournalNode 集群通信的类 AsyncLoggerSet, QuorumJournalManager 通过 AsyncLoggerSet 来实现对 JournalNode 集群上的 EditLog 的写操作,对于读操作,QuorumJournalManager 则是通过 Http 接口从 JournalNode 上的 JournalNodeHttpServer 读取 EditLog 的数据。

AsyncLoggerSet:内部包含了与 JournalNode 集群进行通信的 AsyncLogger 列表,每一个 AsyncLogger 对应于一个 JournalNode 节点,另外 AsyncLoggerSet 也包含了用于等待大多数 JournalNode 返回结果的工具类方法给 QuorumJournalManager 使用。

AsyncLogger:具体的实现类是 IPCLoggerChannel,IPCLoggerChannel 在执行方法调用的时候,会把调用提交到一个单线程的线程池之中,由线程池线程来负责向对应的 JournalNode 的 JournalNodeRpcServer 发送 RPC 请求。

JournalNodeRpcServer:运行在 JournalNode 节点进程中的 RPC 服务,接收 NameNode 端的 AsyncLogger 的 RPC 请求。

JournalNodeHttpServer:运行在 JournalNode 节点进程中的 Http 服务,用于接收处于 Standby 状态的 NameNode 和其它 JournalNode 的同步 EditLog 文件流的请求。

关于NameNode HA机制更多细节的参考

NameNode的切换流程

NameNode的切换流程分为以下几个步骤:

  1. HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
  2. HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
  3. 如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
  4. ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
  5. ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
  6. ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。

HDFS的读写流程

读流程分析

客户端打开文件,通过rpc的方式向NameNode获取文件快的存储位置信息,NameNode会将文件中的各个块的所有副本DataNode全部返回,这些DataNode会按照与客户端的位置的距离排序。如果客户端就是在DataNode上,客户端可以直接从本地读取文件,跳过网络IO,性能更高。客户端调研read方法,存储了文件的前几个块的地址的DFSInputStream,就会连接存储了第一个块的最近的DataNode。然后通过DFSInputStream就通过重复调用read方法,数据就从DataNode流动到可后端,当改DataNode的最后一个快读取完成了,DFSInputSteam会关闭与DataNode的连接,然后寻找下一个快的最佳节点。这个过程读客户端来说透明的,在客户端那边来看们就像是只读取了一个连续不断的流。

块是按顺序读的,通过 DFSInputStream 在 datanode 上打开新的连接去作为客户端读取的流。他也将会呼叫 namenode 来取得下一批所需要的块所在的 datanode 的位置(注意刚才说的只是从 namenode 获取前几个块的)。当客户端完成了读取,就在 FSDataInputStream 上调用 close() 方法结束整个流程。

在这个设计中一个重要的方面就是客户端直接从 DataNode 上检索数据,并通过 NameNode 指导来得到每一个块的最佳 DataNode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 DataNode 展开的。期间,NameNode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。

写流程分析

  1. 通过Client向远程的NameNode发送RPC请求;
  2. 接收到请求后NameNode会首先判断对应的文件是否存在以及用户是否有对应的权限,成功则会为文件创建一个记录,否则会让客户端抛出异常;
  3. 当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根据在Namenode中对replication的设置而定。
  4. 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把packet以流的方式写入第一个 datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个 datanode, 这种写数据的方式呈流水线的形式。
  5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着 "ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。
  6. 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的 pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的 datanode,保持replicas设定的数量。

DFSOutputStream内部原理

打开一个DFSOutputStream流,Client会写数据到流内部的一个缓冲区中,然后数据被分解成多个Packet,每个Packet大小为64k字节,每个Packet又由一组chunk和这组chunk对应的checksum数据组成,默认chunk大小为512字节,每个checksum是对512字节数据计算的校验和数据。

当Client写入的字节流数据达到一个Packet的长度,这个Packet会被构建出来,然后会被放到队列dataQueue中,接着DataStreamer线程会不断地从dataQueue队列中取出Packet,发送到复制Pipeline中的第一个DataNode上,并将该Packet从dataQueue队列中移到ackQueue队列中。ResponseProcessor线程接收从Datanode发送过来的ack,如果是一个成功的ack,表示复制Pipeline中的所有Datanode都已经接收到这个Packet,ResponseProcessor线程将packet从队列ackQueue中删除。

在发送过程中,如果发生错误,所有未完成的Packet都会从ackQueue队列中移除掉,然后重新创建一个新的Pipeline,排除掉出错的那些DataNode节点,接着DataStreamer线程继续从dataQueue队列中发送Packet。

下面是DFSOutputStream的结构及其原理,如图所示:

从下面3个方面来描述内部流程:

  • 创建Packet

Client写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个Chunk大小(512B)时,便会创建一个Packet对象,然后向该Packet对象中写Chunk Checksum校验和数据,以及实际数据块Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个Chunk大小时,都会向Packet中写上述数据内容,直到达到一个Packet对象大小(64K),就会将该Packet对象放入到dataQueue队列中,等待DataStreamer线程取出并发送到DataNode节点。

  • 发送Packet

DataStreamer线程从dataQueue队列中取出Packet对象,放到ackQueue队列中,然后向DataNode节点发送这个Packet对象所对应的数据。

  • 接收ack

发送一个Packet数据包以后,会有一个用来接收ack的ResponseProcessor线程,如果收到成功的ack,则表示一个Packet发送成功。如果成功,则ResponseProcessor线程会将ackQueue队列中对应的Packet删除。

HDFS的使用场景和缺点

使用场景

  1. hdfs的设计一次写入,多次读取,支持修改。
  2. 大文件,在hdfs中一个块通常是64M、128M、256M,小文件会占用更多的元数据存储,增加文件数据块的寻址时间。
  3. 延时高,批处理。
  4. 高容错,多副本。

缺点

  1. 延迟比较高,不适合低延迟高吞吐率的场景
  2. 不适合小文件,小文件会占用NameNode的大量元数据存储内存,并且增加寻址时间
  3. 支持并发写入,一个文件只能有一个写入者
  4. 不支持随机修改,仅支持append

作者介绍

黄骞,微鲤广告平台负责人,曾作为核心成员参与微鲤大数据平台、广告平台等系统从0到1的开发。

微鲤技术团队

微鲤技术团队承担了中华万年历、Maybe、蘑菇语音、微鲤游戏高达3亿用户的产品研发工作,并构建了完备的大数据平台、基础研发框架、基础运维设施。践行数据驱动理念,相信技术改变世界。