从 HDFS 到 HBase 的 RPC 设计思路

两者虽然都依赖了 ProtoBuf,但都暗藏玄机。

总的来说,HDFS 的 RPC 分为两个大类:普通与流式 RPC,普通 RPC 全程传递的都是 ProtoBuf 编码对象,流式 RPC 则分为两段数据,一段 ProtoBuf 编码对象,一段一系列的 Packet 数据包。HBase 的 RPC 主要应用是批量传递 Cell[] 数据单元组,因此 HBase 使用未经 ProtoBuf 编码的 CellBlocks 作为 ProtoBuf 协议的头字段数据,把其他非 Cell[] 数据单元组的内容通过 ProtoBuf 协议的正文部分进行编码传递。

通常 ProtoBuf 都是配 gPRC 框架的,但是它们都是自己实现了一套通讯框架。

HDFS

以 HDFS 从客户端请求 NameNode 为例。

  • 客户端采用 Java 动态代理

DFSClient 首先调用 ClientNamenodeProtocolTranslatorPB 翻译算子(算子不仅会在请求时把用户数据转换为 ProtoBuf 编码对象,还会在操作结束返回时再把编码对象转换成用户数据),然后交由实现 ClientNamenodeProtocolPB 接口的代理对象 rpcProxy(插播一则小知识:Java 的动态代理对象只能是接口),因此所有的调用信息都被 ProtobufRpcEngine$Invoker#invoke 动态代理所收集了,最终实现数据传输。

NameNode 端在则是启动时创建 Server 对象,Accept NIO 连接后,Server#processOneRpc 获取出 Call 对象 ,通过 Call 对象中的 ProtoBuf 协议的请求头获取方法名,调用 ClientNamenodeProtocolServerSideTranslatorPB 翻译算子,由翻译算子转换数据并分发处理任务。

可以看出 HDFS 基本上围绕两个翻译算子 ,一个代理接口实现的。(另外其实还有一个门面接口 ClientProtocol,客户端和服务器端均实现,便于用户关联查找代码实现)

HBase

而 HBase 类似,但它把 ProtoBuf 模式使用得更淋漓尽致。

  • 客户端采用 Stub 接口

在 ProtoBuf 原本设计中,Service 接口与 Stub 接口分别代表服务层接口与客户端接口。如 HTable 对象就是使用 Stub 接口,在 AbstractRpcClient 对象中直接暴露传输过程。但 HDFS 客户端不使用此设计。

  • CellBlocks

首先是构造记录 Cell[] 数据单元组的 HBaseRpcControllerImpl,继承自 com.google.protobuf.RpcController,该对象涉及链路中的上下文,这使得本来只能传递 ProtoBuf 字段的数据调用,通过改写上下文对象,插入新的数据字段。比如:

  /**
   * A helper to replicate a list of WAL entries using admin protocol.
   * @param admin Admin service
   * @param entries Array of WAL entries to be replicated
   * @param replicationClusterId Id which will uniquely identify source cluster FS client
   *          configurations in the replication configuration directory
   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
   * @throws java.io.IOException
   */
  public static void replicateWALEntry(final AdminService.BlockingInterface admin,
      final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
      Path sourceHFileArchiveDir) throws IOException {
    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
        buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
          sourceHFileArchiveDir);
    HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
    try {
      // controller 会作为上下文一直传递
      admin.replicateWALEntry(controller, p.getFirst());
    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
      throw ProtobufUtil.getServiceException(e);
    }
  }

然后在 AbstractRpcClient#callMethod 中把请求封装成 Call 数据类型(同时取出 HBaseRpcControllerImpl 中的 Cell[]),放入发送队列里,另一线程消费,并通过 BlockingRpcConnection#writeRequest 写出数据。在这里,HBase 的 CellBlocks Trick 把戏就显现了:

  /**
   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
   * the Connection thread, but by other threads.
   * @see #readResponse()
   */
  private void writeRequest(Call call) throws IOException {
    ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
      this.compressor, call.cells); // cells 就是通过上下文传递过来的
    CellBlockMeta cellBlockMeta;
    if (cellBlock != null) {
      cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
    } else {
      cellBlockMeta = null;
    }
    RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);

    setupIOstreams();

    calls.put(call.id, call);
    try {
      call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
    } catch (Throwable t) {
      if(LOG.isTraceEnabled()) {
        LOG.trace("Error while writing call, call_id:" + call.id, t);
      }
      IOException e = IPCUtil.toIOE(t);
      closeConn(e);
      return;
    }
    notifyAll();
  }

也就是在 ProtoBuf 头部写出 Cell[] 数据单元组,对于已编码的数据就可以避免重复编码。

  • 客户端异步 RPC 设计

相比 HDFS 客户端的基于 BIO 的同步设计,HBase 2.x 实现客户端 Netty 非阻塞 IO 的设计。写出数据时候, 用户线程在 NettyRpcConnection#sendRequest 后被暂停,交由 EventLoop 线程处理,并在 NettyRpcDuplexHandler 记录下 Call ID;当 EventLoop 线程接收到服务端返回结果后,通过响应的 Call ID 找到回调的 Future,填充 Future 的 Result 结果字段,唤醒用户线程进行用户处理。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注