当数据集的大小超过一台独立的物理计算机的存储能力时,就有必要对它进行分区并存储到若干台单独的计算机上。HDFS是hadoop的主要分布式存储系统,一个HDFS集群主要包括NameNode用来管理文件系统的metadata,DataNode用来存储实际的数据。下面是HDFS的一些特色java
HDFS采用master/slave架构。一个HDFS集群通常有一个Namenode和必定数目的Datanode组成。Namenode是一个中心服务器,负责管理文件系统的namespace和客户端对文件的访问。Datanode在集群中通常是一个节点,负责管理节点上它们附带的存储。在内部,一个文件其实分红一个或多个block,这些block存储在Datanode集合里。Namenode执行文件系统的namespace操做,例如打开、关闭、重命名文件和目录,同时决定block到具体Datanode节点的映射。Datanode在Namenode的指挥下进行block的建立、删除和复制
node
NameNode
是整个系统的管理节点,它维护这整个文件系统的文件目录树,文件/目录的元信息和每一个文件对应的数据块列表。接收用户的操做请求
维护文件包括:web
镜像
,可是fsimage不会随时与NN内存中的metadata保持一致,而是每隔一段时间经过合并edits文件来更新内容(secondary NN是用来合并fsimage和edits文件来更新NN的metadata)当NameNode启动时,它首先须要从fsimage
读取HDFS的状态,而后从edit log file读取edits
的信息。NameNode再将新的HDFS的状态写入到fsimage
和新建一个空的edits file。在启动阶段,NameNode须要合并fsimage
和edits file
,edits log file在一个忙碌的集群中文件大小会变得很大,这会致使在重启NameNode时候耗费较长的时间。
Secondary NameNode按期的合并fsimage
和edits log
并保持edits log文件在一个适度的大小,它每每运行在不一样的机器上面。在启动Secondary NameNode上checkpoint
过程主要收到两个参数的 影响shell
dfs.namenode.checkpoint.period
:默认的是1 hour,两次连续的checkpoint的最大时间间隔dfs.namenode.checkpoint.period
:默认为1 million 超过这个大小就督促进行checkpointed transactions,即便时间间隔还没到
小程序
5.读取结束。客户端直接从DataNode上读取文件,在此过程当中NN不参与文件的传输slave经过RPC请求master,master的方法会被调用服务器
下图是数据块的分布
上传成功后NN始终在内存中保存metadata,用于处理读请求
,metadata主要存储文件的名称FileName
,副本数量replicas
,分多少block存储block-ids
,分别存储在哪几个节点上id2host
。架构
上面讲了那么多不一样角色之间的交互,这些进程间的交互都是经过RPC(Remote Procdure Call,远程过程调用)来进行的,它容许一个进程去访问另外一个进程的方法,这些对于用户都是透明的,能够说Hadoop的运行是创建在RPC基础之上的,在了解RPC以前咱们须要先了解一项技术动态代理
,动态代理能够提供对另外一个对象的访问,同时隐藏实际对象的具体事实,代理对象对客户隐藏了实际对象。在Hadoop中DataNode端是经过得到NameNode的代理,经过该代理和NameNode进行通讯的,为了更好的分析hadoop的RPC机制我想先分析一下动态代理是怎么实现。
目前Java开发包中提供了对动态代理的支持,但如今只支持对接口的实现,咱们须要定义一个接口。并发
interface DynamicService { public void show(); }
当实现动态代理的时候,须要实现InvocationHandler类,而且覆写其Invoker方法分布式
class ClassA implements DynamicService { @Override public void show() { System.out.println("this is class A"); } } class ClassB implements DynamicService { @Override public void show() { System.out.println("this is class B"); } } class Invoker implements InvocationHandler { DynamicService ds; public Invoker(DynamicService ds) { this.ds = ds; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //add some dynamic methods here method.invoke(ds, args); //after return null; } }
下面是例子的测试ide
public static void main(String[] args) { Invoker inv1 = new Invoker(new ClassA()); DynamicService ds = (DynamicService) Proxy.newProxyInstance(DynamicService.class.getClassLoader(), new Class[] {DynamicService.class}, inv1); ds.show(); //添加另一个 Invoker inv2 = new Invoker(new ClassB()); DynamicService dss = (DynamicService) Proxy.newProxyInstance(DynamicService.class.getClassLoader(), new Class[] {DynamicService.class}, inv2); dss.show(); }
如今咱们就须要去实现Hadoop RPC,主要分为如下几步
4.构造RPC Client 并发送请求:使用RPC.getProxy()构造客户端代理对象,经过代理对象访问远程端的方法
有了上面的几步,咱们如今就能够本身写一个RPC小程序了
public interface RPCService { public static final long versionID = 10010L;//版本号,不一样版本号的RPC Client和Server之间不能相互通讯 public String sayHi(String name); }
public class RPCServer implements RPCService { @Override public String sayHi(String name) { return "server response"+name; } public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Configuration conf = new Configuration(); Server server = new RPC.Builder(conf)// .setProtocol(RPCService.class)// .setBindAddress("10.30.100.11")// 服务器地址 .setPort(1234)//端口 .setInstance(new RPCServer())// 设置托管对象 .build(); server.start(); } }
启动服务器以后,服务器就在指定端口监听客户端的请求。服务器就处于监听状态等待客户端请求到达。
public class RPCClient { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); //接口类型,就能够调用接口中的方法.在客户端获取代理对象,有了代理对象就能够调用目标对象(RPCServer)的方法了 RPCService proxy = RPC.getProxy(RPCService.class, 10010, new InetSocketAddress("10.30.100.11", 1234), conf);//server的代理对象,server必须实现这个接口 String result = proxy.sayHi("boyaa"); System.out.println(result); RPC.stopProxy(proxy); } }