`
liaobinxu
  • 浏览: 42267 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

学习 Hadoop 源代码分析(一零)

阅读更多



学习 Hadoop 源代码分析(一零)


在继续分析DataNode之前,我们有必要看一下系统的工作状态。启动HDFS的

时候,我们可以选择以下启动参数:

启动选项枚举 StartupOption

   FORMAT  ("-format"):格式化系统

   REGULAR ("-regular"):正常启动

   BACKUP  ("-backup"):备份

   CHECKPOINT("-checkpoint"):检查点

   UPGRADE ("-upgrade"):升级

  ROLLBACK("-rollback"):回滚

   FINALIZE("-finalize"):提交

   IMPORT  ("-importCheckpoint"):从Checkpoint恢复




启动参数相关的命名节点NamenodeRole命名节点角色

ACTIVE: NameNode  命名节点(正常启动)

BACKUP: Backup Node 备份节点

CHECKPOINT: Checkpoint Node 检查点 节点

STANDBY: Standby Node: 备份节点



作为一个大型的分布式系统,Hadoop内部实现了一套升级机制(http://wiki.apache.org/hadoop/Hadoop_Upgrade)。upgrade参数就是为了这

个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用rollback进行回滚;如果过了一段时间,系统运行正常,那就可以通过

finalize,正式提交这次升级(跟数据库有点像啊)。importCheckpoint选项用于NameNode发生故障后,从某个检查点恢复。有了上面的描述,我们得到下面左边的状态图:




大家应该注意到,上面的升级/回滚/提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上,甚至

可能出现某些节点已经升级成功,但有些节点可能处于中间状态的情况,所以Hadoop采用类似于数据库事务的升级机制也就不是很奇怪。大家先理解一下上面的状态图,它是下面我们要介绍DataNode存储的基础。


下图是升级时候 cluster / namenode , datanode 的时序图



DataNode收到以后,会将当前的数据块文件目录改名,从current改名为previous.tmp,建立一个snapshot,然后重建current目录。重建包括重建VERSION文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到revious.tmp的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件,current和previous.tmp中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后,会在current里写入新的VERSION文件,并将previous.tmp目录改名为previous,完成升级。




1) NORMAL - > UPGRADE

 

void org.apache.hadoop.hdfs.server.datanode.DataStorage.doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException
 
 


具体实现

 

 void doUpgrade(StorageDirectory sd,
                 NamespaceInfo nsInfo
                 ) throws IOException {
    LOG.info("Upgrading storage directory " + sd.getRoot()
             + ".\n   old LV = " + this.getLayoutVersion()
             + "; old CTime = " + this.getCTime()
             + ".\n   new LV = " + nsInfo.getLayoutVersion()
             + "; new CTime = " + nsInfo.getCTime());
    File curDir = sd.getCurrentDir();
    File prevDir = sd.getPreviousDir();
    assert curDir.exists() : "Current directory must exist.";
    // Cleanup directory "detach"
    cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
    // delete previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
    // rename current to tmp
    rename(curDir, tmpDir);
    // hard link finalized & rbw blocks
    linkAllBlocks(tmpDir, curDir);
    // create current directory if not exists
    if (!curDir.exists() && !curDir.mkdirs())
      throw new IOException("Cannot create directory " + curDir);
    // write version file
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    assert this.namespaceID == nsInfo.getNamespaceID() :
      "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    sd.write();
    // rename tmp to previous
    rename(tmpDir, prevDir);
    LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
  }
 

doUpgrade步骤

1.  清除detach文件夹(备份)

2.  清除previous 

3. current 文件夹改名为tmp(previous.tmp)

4. 重建current(重建的过程就是,把tmp文件的 all finalized and RBW blocks内容复制到current)

5. 保存从namenode获取的namespaceid和创建时间

6. 修改tmp(previous.tmp文件为previous



2). UPGRADE --(ROLLBACK) -- > NORMAL


void org.apache.hadoop.hdfs.server.datanode.DataStorage.doRollback(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException
 

 

 具体

 

void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (!prevDir.exists())
      return;
    DataStorage prevInfo = new DataStorage();
    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
    prevSD.read(prevSD.getPreviousVersionFile());

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
          && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback(previous.tmp
      throw new InconsistentFSStateException(prevSD.getRoot(),
                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                             + " is newer than the namespace state: LV = "
                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
    LOG.info("Rolling back storage directory " + sd.getRoot()
             + ".\n   target LV = " + nsInfo.getLayoutVersion()
             + "; target CTime = " + nsInfo.getCTime());
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  }
 



1. 获取previous version版本

2. 删除tmp目录

3. 把current目录改为tmp目录(remove.tmp)

4. 把previous 目录该为current

5. 删除tmp目录(remove.tmp)



3). UPGRADE --(FINALIZE)-- NORMAL

 

void org.apache.hadoop.hdfs.server.datanode.DataStorage.doFinalize(StorageDirectory sd) throws IOException
 

 此操作是管理员手动的操作(数据节点 通过heartbeat 获得命令)(不用停掉这个集群, 管理员命令)

 

 void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    LOG.info("Finalizing upgrade for storage directory " 
             + dataDirPath 
             + ".\n   cur LV = " + this.getLayoutVersion()
             + "; cur CTime = " + this.getCTime());
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();
    // rename previous to tmp
    rename(prevDir, tmpDir);

    // delete tmp dir in a separate thread
    new Daemon(new Runnable() {
        public void run() {
          try {
            deleteDir(tmpDir);
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
        }
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }
 


1. 把previous 目录改名为 tmp目录(finalize.tmp)

2. 单独启动一个线程用于删除tmp目录(finalize.tmp),原因是删除可能非常耗时,doFinalize是在心跳中完成的,重启一个线程保证心跳的高效。

 


 




 

StorageInfo包含了3个字段,分别是layoutVersion:版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致。namespaceID是Storage的ID,cTime,creation time。

和StorageInfo相比,Storage就是个大家伙了。

Storage可以包含多个根(参考配置项dfs.data.dir的说明),这些根通过Storage的内部类StorageDirectory来表示。

StorageDirectory中最重要的方法是analyzeStorage,它将根据系统启动时的参数和我们上面提到的一些判断条件,返回系统现在的状态。StorageDirectory可能处于以下的某一个状态(与系统的工作状态一定的对应):

 

 

NON_EXISTENT:指定的目录不存在;

NOT_FORMATTED:指定的目录存在但未被格式化;

 

COMPLETE_UPGRADE:previous.tmp存在,current也存在

RECOVER_UPGRADE:previous.tmp存在,current不存在

 

COMPLETE_FINALIZE:finalized.tmp存在,current也存在

 

 

 

COMPLETE_ROLLBACK:removed.tmp存在,current也存在,previous不存在

RECOVER_ROLLBACK:removed.tmp存在,current不存在,previous存在

 

COMPLETE_CHECKPOINT:lastcheckpoint.tmp存在,current也存在

RECOVER_CHECKPOINT:lastcheckpoint.tmp存在,current不存在

 

 

 

NORMAL:普通工作模式。

 

 

StorageDirectory处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的current夹来进行判断。状态改

变需要的工作文件夹包括:

previous:用于升级后保存以前版本的文件

previous.tmp:用于升级过程中保存以前版本的文件

removed.tmp:用于回滚过程中保存文件

finalized.tmp:用于提交过程中保存文件

lastcheckpoint.tmp:应用于从NameNode中,导入一个检查点

previous.checkpoint:应用于从NameNode中,结束导入一个检查点

 

 

 


 

有了这些状态,就可以对系统进行恢复(通过方法doRecover)。恢复的动作如

下(结合上面的状态转移图):

 


 

COMPLETE_UPGRADE:mv previous.tmp -> previous

RECOVER_UPGRADE:mv previous.tmp -> current

COMPLETE_FINALIZE:rm finalized.tmp

COMPLETE_ROLLBACK:rm removed.tmp

RECOVER_ROLLBACK:mv removed.tmp -> current

COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp ->

previous.checkpoint

RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current

 

 

 

 

 

当升级某个集群的Hadoop的时候,正如任何软件的升级一样,可能会引入新的bug或者不兼容的修改导致现有的应用出现过去没有发现的问题。在所有重要的HDFS安装应用中,是不允许出现因丢失任何数据需要从零开始重启

HDFS的情况。HDFS允许管理员恢复到 Hadoop的早期版本,并且将集群的状态回滚到升级前。HDFS的升级细节请参考 upgrade wiki 。HDFS在任何时间只能有一个备份,因此在升级前,管理员需要通过'bin/hadoop dfsadmin

-finalizeUpgrade'命令移除现有的备份。下面简要描述了典型的升级过程:


1)在升级Hadoop前,如果已经存在备份,需要先结束(finalize)它。可以通过'dfsadmin  -upgradeProgress status'命令查询集群是否需要执行finalize

2)停止集群,分发部署新版本的Hadoop

3)执行新版本的hadoop,通过添加 -upgrade 选项,例如/bin/start-dfs.sh -upgrade

4)大多数情况下,集群在升级后可以正常运行。一旦新的HDFS在运行若干天的操作后没有出现问题,那么就可以结束(finalize)这次升级。请注意,在升级前删除的文件并不释放在datanode上的实际磁盘空间,直到集群被结(finalize)升级前。

5)如果有需要回到老版本的Hadoop,那么可以:

a)停止集群,分发部署老版本的Hadoop

b)通过rollback选项启动集群,例如bin/start-dfs.sh -rollback

 

  • 描述: 图片
  • 大小: 14.7 KB
  • 大小: 121.6 KB
  • 大小: 14.5 KB
  • 大小: 286.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics