`
liaobinxu
  • 浏览: 42039 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

【转】Hadoop源代码分析(一)

经济不行啦,只好潜心研究技术。

Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。

GoogleCluster: http://research.google.com/archive/googlecluster.html

Chubby:http://labs.google.com/papers/chubby.html

GFS:http://labs.google.com/papers/gfs.html

BigTable:http://labs.google.com/papers/bigtable.html

MapReduce:http://labs.google.com/papers/mapreduce.html

很快,Apache上就出现了一个类似的解决方案,目前它们都属于Apache的Hadoop项目,对应的分别是:

Chubby-->ZooKeeper

GFS-->HDFS

BigTable-->HBase

MapReduce-->Hadoop

目前,基于类似思想的Open Source项目还很多,如Facebook用于用户分析的Hive。

HDFS作为一个分布式文件系统,是所有这些项目的基础。分析好HDFS,有利于了解其他系统。由于Hadoop的HDFS和MapReduce是同一个项目,我们就把他们放在一块,进行分析。

下图是MapReduce整个项目的顶层包图和他们的依赖关系。Hadoop包之间的依赖关系比较复杂,原因是HDFS提供了一个分布式文件系统,该系统提供API,可以屏蔽本地文件系统和分布式文件系统,甚至象Amazon S3这样的在线存储系统。这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。功能的相互引用,造成了蜘蛛网型的依赖关系。一个典型的例子就是包conf,conf用于读取系统配置,它依赖于fs,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包fs中被抽象了。

Hadoop的关键部分集中于图中蓝色部分,这也是我们考察的重点

[img]http://dl.iteye.com/upload/attachment/376269/160c8e6c-f075-3775-a24c-18bc94eecee6.jpg" alt="[/img]

 

 

Hadoop源代码分析(二)

 

下面给出了Hadoop的包的功能分析。

Package

Dependences

tool

提供一些命令行工具,如DistCparchive

mapreduce

HadoopMap/Reduce实现

filecache

提供HDFS文件的本地缓存,用于加快Map/Reduce的数据访问速度

fs

文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口

hdfs

HDFSHadoop的分布式文件系统实现

ipc

一个简单的IPC的实现,依赖于io提供的编解码功能

参考:http://zhangyu8374.iteye.com/blog/86306

io

表示层。将各种数据编码/解码,方便于在网络上传输

net

封装部分网络功能,如DNSsocket

security

用户和用户组信息

conf

系统的配置参数

metrics

系统统计数据的收集,属于网管范畴

util

工具类

record

根据DDL(数据描述语言)自动生成他们的编解码函数,目前可以提供C++Java

http

基于JettyHTTP Servlet,用户通过浏览器可以观察文件系统的一些状态信息和日志

log

提供HTTP访问日志的HTTP Servlet

 

 

Hadoop源代码分析(三)

 

由于Hadoop的MapReduce和HDFS都有通信的需求,需要对通信的对象进行序列化。Hadoop并没有采用Java的序列化,而是引入了它自己的系统。

org.apache.hadoop.io中定义了大量的可序列化对象,他们都实现了Writable接口。实现了Writable接口的一个典型例子如下:

 

public class MyWritable implements Writable {   
    // Some data        
    private int counter;   
    private long timestamp;   
  
    public void write(DataOutput out) throws IOException {   
        out.writeInt(counter);   
        out.writeLong(timestamp);   
    }   
      
    public void readFields(DataInput in) throws IOException {   
        counter = in.readInt();   
        timestamp = in.readLong();   
    }   
  
    public static MyWritable read(DataInput in) throws IOException {   
        MyWritable w = new MyWritable();   
        w.readFields(in);   
        return w;   
    }   
} 

 

 

 

 

 

 

 

这里,我把ObjectWritable标为红色,是因为相对于其他对象,它有不同的地位。当我们讨论Hadoop的RPC时,我们会提到RPC上交换的信息,必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。ObjectWritable对象保存了一个可以在RPC上传输的对象和对象的类型信息。这样,我们就有了一个万能的,可以用于客户端/服务器间传输的Writable对象。例如,我们要把上面例子中的对象作为RPC请求,需要根据MyWritable创建一个ObjectWritable,ObjectWritable往流里会写如下信息

对象类名长度,对象类名,对象自己的串行化结果

这样,到了对端,ObjectWritable可以根据对象类名创建对应的对象,并解串行。应该注意到,ObjectWritable依赖于WritableFactories,那存储了Writable子类对应的工厂。我们需要把MyWritable的工厂,保存在WritableFactories中(通过WritableFactories.setFactory)。

 

 

 

Hadoop源代码分析(五)

 

 

介绍完org.apache.hadoop.io以后,我们开始来分析org.apache.hadoop.rpc。RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论HDFS的,通信可能发生在:

  • Client-NameNode之间,其中NameNode是服务器
  • Client-DataNode之间,其中DataNode是服务器
  • DataNode-NameNode之间,其中NameNode是服务器
  • DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端

如果我们考虑Hadoop的Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop引入了一个RPC框架。该RPC框架利用的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。但是,该RPC框架要求调用的参数和返回结果必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出IOException异常。(参考自http://zhangyu8374.iteye.com/blog/86306

既然是RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。但是类Server是一个抽象类,类RPC封装了Server,利用反射,把某个对象的方法开放出来,变成RPC中的服务器。

下图是org.apache.hadoop.rpc的类图。

 

 

 

 

 

 

既然是RPC,自然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。在这里我们来仔细考察org.apache.hadoop.rpc.Client。下面的图包含了org.apache.hadoop.rpc.Client中的关键类和关键方法。

由于Client可能和多个Server通信,典型的一次HDFS读,需要和NameNode打交道,也需要和某个/某些DataNode通信。这就意味着某一个Client需要维护多个连接。同时,为了减少不必要的连接,现在Client的做法是拿ConnectionId(图中最右侧)来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口号)对象和一个用户信息对象。这就是说,同一个用户到同一个InetSocketAddress的通信将共享同一个连接。

 

 

连接被封装在类Client.Connection中,所有的RPC调用,都是通过Connection,进行通信。一个RPC调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个Connection上的不同调用,每个调用都有唯一的id。调用是否结束也需要一个标记,所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表,维护在这个连接上的所有Call:

 

Java代码 
  1. private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();  

 

一个RPC调用通过addCall,把请求加到Connection里。为了能够在这个框架上传输Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组,我们一般把Call需要的参数打包成为ObjectWritable对象。

 

Client.Connection会通过socket连接服务器,连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过Writable对象来进行请求的发送/应答了。注意,每个Client.Connection会起一个线程,不断去读取socket,并将收到的结果解包,找出对应的Call,设置Call并通知结果已经获取。

Call使用Obejctwaitnotify,把RPC上的异步消息交互转成同步调用。

还有一点需要注意,一个Client会有多个Client.Connection,这是一个很自然的结果。

 

 

 

 

Hadoop源代码分析(七)

聊完了ClientServer按惯例,先把类图贴出来。

 

 

需要注意的是,这里的Server类是个抽象类,唯一抽象的地方,就是

Java代码 
  1. public abstract Writable call(Writable param, long receiveTime) throws IOException;  

 

 


 

 

这表明,Server提供了一个架子,Server的具体功能,需要具体类来完成。而具体类,当然就是实现call方法。

我们先来分析Server.Call,和Client.Call类似,Server.Call包含了一次请求,其中,id和param的含义和Client.Call是一致的。不同点在后面三个属性,connection是该Call来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,发送给客户端。属性timestamp是请求到达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的response是请求处理的结果,可能是一个Writable的串行化结果,也可能一个异常的串行化结果。

Server.Connection维护了一个来之客户端的socket连接。它处理版本校验,读取请求并把请求发送到请求处理线程,接收处理结果并把结果发送给客户端。

Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是上面图里的Listener。

请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。

对于处理完的请求,需要将结果写回去,同样,利用NIO,只需要一个线程,相关的逻辑在Responder里。

 

 

 

Hadoop源代码分析(八)

 

 

(注:本节需要用到一些Java反射的背景)

有了Client和Server,很自然就能RPC啦。下面轮到RPC.java啦。

一般来说,分布式对象一般都会要求根据接口生成存根和框架。如CORBA,可以通过IDL,生成存根和框架。但是,在org.apache.hadoop.rpc,我们就不需要这样的步骤了。上类图。

为了分析Invoker,我们需要介绍一些Java反射实现Dynamic Proxy的背景。

Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler,后者是一个接口。所谓Dynamic Proxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些interface。

这个Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。

我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler实现中。

Hadoop的RPC中,Invoker实现了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的Client,通过socket传递到服务器端。就是说,你在proxy类上的任何调用,都通过Client发送到远方的服务器上。

Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和parameters,调用方法参数。注意,它实现了Writable接口,可以串行化。

RPC.Server实现了org.apache.hadoop.ipc.Server,你可以把一个对象,通过RPC,升级成为一个服务器。服务器接收到的请求(通过Invocation),解串行化以后,就变成了方法名,方法参数列表和参数列表。利用Java反射,我们就可以调用对应的对象的方法。调用的结果再通过socket,返回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy的使用者了。

 

 

 

一个典型的HDFS系统包括一个NameNode和多个DataNode。NameNode维护名字空间;而DataNode存储数据块。

DataNode负责存储数据,一个数据块在多个DataNode中有备份;而一个DataNode对于一个块最多只包含一个备份。所以我们可以简单地认为DataNode上存了数据块ID和数据块内容,以及他们的映射关系。

一个HDFS集群可能包含上千DataNode节点,这些DataNode定时和NameNode通信,接受NameNode的指令。为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。

DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回其还也包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。应该注意的是:NameNode不会发起到DataNode的请求,在这个通信过程中,它们是严格的客户端/服务器架构。

DataNode当然也作为服务器接受来自客户端的访问,处理数据块读/写请求。DataNode之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。

下面我们就来具体分析一下DataNode的实现。DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。

安装Hadoop的时候,我们会指定对应的数据块存放目录,当我们检查数据块存放目录目录时,我们回发现下面有个叫dfs的目录,所有的数据就存放在dfs/data里面。

其中有两个文件,storage里存的东西是一些出错信息,貌似是版本不对…云云。in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。

接下来是3个目录,current存的是当前有效的数据块,detach存的是快照(snapshot,目前没有实现),tmp保存的是一些操作需要的临时数据块。

但我们进入current目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0到subdir63,子目录下也有数据块文件和数据块元数据。这是因为HDFS限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。

数据块文件显然保存了HDFS中的数据,数据块最大可以到64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:

blk_3148782637964391313
blk_3148782637964391313_242812.meta

上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的版本号,用于一致性检查。

current目录下还有下面几个文件:

VERSION,保存了一些文件系统的元信息。 

 

dncp_block_verification.log.currdncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。

 

  • 大小: 78.3 KB
  • 大小: 86.6 KB
  • 大小: 130.3 KB
  • 大小: 48.1 KB
  • 大小: 41.4 KB
  • 大小: 53.7 KB
  • 大小: 9.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics