从 HDFS 到 HBase 的 RPC 设计思路
两者虽然都依赖了 ProtoBuf,但都暗藏玄机。
总的来说,HDFS 的 RPC 分为两个大类:普通与流式 RPC,普通 RPC 全程传递的都是 ProtoBuf 编码对象,流式 RPC 则分为两段数据,一段 ProtoBuf 编码对象,一段一系列的 Packet 数据包。HBase 的 RPC 主要应用是批量传递 Cell[] 数据单元组,因此 HBase 使用未经 ProtoBuf 编码的 CellBlocks 作为 ProtoBuf 协议的头字段数据,把其他非 Cell[] 数据单元组的内容通过 ProtoBuf 协议的正文部分进行编码传递。
通常 ProtoBuf 都是配 gPRC 框架的,但是它们都是自己实现了一套通讯框架。
HDFS
以 HDFS 从客户端请求 NameNode 为例。
- 客户端采用 Java 动态代理
DFSClient 首先调用 ClientNamenodeProtocolTranslatorPB 翻译算子(算子不仅会在请求时把用户数据转换为 ProtoBuf 编码对象,还会在操作结束返回时再把编码对象转换成用户数据),然后交由实现 ClientNamenodeProtocolPB 接口的代理对象 rpcProxy(插播一则小知识:Java 的动态代理对象只能是接口),因此所有的调用信息都被 ProtobufRpcEngine$Invoker#invoke 动态代理所收集了,最终实现数据传输。
NameNode 端在则是启动时创建 Server 对象,Accept NIO 连接后,Server#processOneRpc 获取出 Call 对象 ,通过 Call 对象中的 ProtoBuf 协议的请求头获取方法名,调用 ClientNamenodeProtocolServerSideTranslatorPB 翻译算子,由翻译算子转换数据并分发处理任务。
可以看出 HDFS 基本上围绕两个翻译算子 ,一个代理接口实现的。(另外其实还有一个门面接口 ClientProtocol,客户端和服务器端均实现,便于用户关联查找代码实现)
HBase
而 HBase 类似,但它把 ProtoBuf 模式使用得更淋漓尽致。
- 客户端采用 Stub 接口
在 ProtoBuf 原本设计中,Service 接口与 Stub 接口分别代表服务层接口与客户端接口。如 HTable 对象就是使用 Stub 接口,在 AbstractRpcClient 对象中直接暴露传输过程。但 HDFS 客户端不使用此设计。
- CellBlocks
首先是构造记录 Cell[] 数据单元组的 HBaseRpcControllerImpl,继承自 com.google.protobuf.RpcController,该对象涉及链路中的上下文,这使得本来只能传递 ProtoBuf 字段的数据调用,通过改写上下文对象,插入新的数据字段。比如:
/**
* A helper to replicate a list of WAL entries using admin protocol.
* @param admin Admin service
* @param entries Array of WAL entries to be replicated
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @throws java.io.IOException
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
Path sourceHFileArchiveDir) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
sourceHFileArchiveDir);
HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
try {
// controller 会作为上下文一直传递
admin.replicateWALEntry(controller, p.getFirst());
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
throw ProtobufUtil.getServiceException(e);
}
}
然后在 AbstractRpcClient#callMethod 中把请求封装成 Call 数据类型(同时取出 HBaseRpcControllerImpl 中的 Cell[]),放入发送队列里,另一线程消费,并通过 BlockingRpcConnection#writeRequest 写出数据。在这里,HBase 的 CellBlocks Trick 把戏就显现了:
/**
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
* the Connection thread, but by other threads.
* @see #readResponse()
*/
private void writeRequest(Call call) throws IOException {
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
this.compressor, call.cells); // cells 就是通过上下文传递过来的
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
setupIOstreams();
calls.put(call.id, call);
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
notifyAll();
}
也就是在 ProtoBuf 头部写出 Cell[] 数据单元组,对于已编码的数据就可以避免重复编码。
- 客户端异步 RPC 设计
相比 HDFS 客户端的基于 BIO 的同步设计,HBase 2.x 实现客户端 Netty 非阻塞 IO 的设计。写出数据时候, 用户线程在 NettyRpcConnection#sendRequest 后被暂停,交由 EventLoop 线程处理,并在 NettyRpcDuplexHandler 记录下 Call ID;当 EventLoop 线程接收到服务端返回结果后,通过响应的 Call ID 找到回调的 Future,填充 Future 的 Result 结果字段,唤醒用户线程进行用户处理。