Skip to content

HBase读写流程

1. 读流程

1.1 HFile结构

HFile是存储在HDFS上面每一个store文件夹下实际存储数据的文件。里面存储多种内容。包括数据本身(keyValue 键值对)、元数据记录、文件信息、数据索引、元数据索引和一个固定长度的尾部信息(记录文件的修改情况)。键值对按照块大小(默认64K)保存在文件中,数据索引按照块创建,块越多,索引越大。每一个HFile还会维护一个布隆过滤器(就像是一个很大的地图,文件中每有一种key,就在对应的位置标记,读取时可以大致判断要get的key是否存在HFile中)。KeyValue内容如下:
rowlength -----------→ key的长度
row -----------------→ key的值
columnfamilylength --→ 列族长度
columnfamily --------→ 列族
columnqualifier -----→ 列名
timestamp -----------→ 时间戳(默认系统时间)
keytype -------------→ Put
由于HFile存储经过序列化,所以无法直接查看。可以通过HBase提供的命令来查看存储在HDFS上面的HFile元数据内容。

sh
[jack@hadoop102 hbase]$ bin/hbase hfile -m -f /hbase/data/命名空间/表名/regionID/列族/HFile 名
[jack@hadoop102 hbase-2.6.1]$ bin/hbase hfile -m -f /HBase/data/bigdata/user/23e1ff3d6a440e5cd5903903e1a1d9a2/info/4658458601c74f21889a4214e6b4b051
Block index size as per heapsize: 376
reader=/HBase/data/bigdata/user/23e1ff3d6a440e5cd5903903e1a1d9a2/info/4658458601c74f21889a4214e6b4b051,
    ## 没有压缩
    compression=none,
    ## 因为还没有去读取过该数据,所以缓存里面没有,都是false
    cacheConf=cacheDataOnRead=false,
    cacheDataOnWrite=false,
    cacheIndexesOnWrite=false,
    cacheBloomsOnWrite=false,
    cacheEvictOnClose=false,
    cacheDataCompressed=false,
    prefetchOnOpen=false,
    firstKey=Optional[1001/info:age/1732768745586/Put/seqid=0],
    lastKey=Optional[1001/info:name/1732767868703/Put/seqid=0],
    avgKeyLen=23,
    avgValueLen=5,
    entries=5,
    length=5140
Trailer:
    fileinfoOffset=374,
    loadOnOpenDataOffset=264,
    dataIndexCount=1,
    metaIndexCount=0,
    totalUncomressedBytes=5047,
    entryCount=5,
    compressionCodec=NONE,
    uncompressedDataIndexSize=36,
    numDataIndexLevels=1,
    firstDataBlockOffset=0,
    lastDataBlockOffset=0,
    comparatorClassName=org.apache.hadoop.hbase.InnerStoreCellComparator,
    encryptionKey=NONE,
    ## 记录版本,如果版本没变,直接可以尝试读取缓存
    majorVersion=3,
    minorVersion=3
Fileinfo:
    BLOOM_FILTER_TYPE = ROW
    COMPACTION_EVENT_KEY = PBUF
    DELETE_FAMILY_COUNT = 0
    EARLIEST_PUT_TS = 1732767868703
    HISTORICAL = false
    KEY_VALUE_VERSION = 1
    LAST_BLOOM_KEY = 1001
    MAJOR_COMPACTION_KEY = false
    MAX_MEMSTORE_TS_KEY = 8
    MAX_SEQ_ID_KEY = 9
    TIMERANGE = 1732767868703....1732768745586
    hfile.AVG_KEY_LEN = 23
    hfile.AVG_VALUE_LEN = 5
    hfile.CREATE_TIME_TS = 1732771600467
    hfile.KEY_OF_BIGGEST_CELL = \x00\x041001\x04infoname\x00\x00\x01\x93q\x10\xC8Z\x04
    hfile.LASTKEY = 1001/info:name/1732767868703/Put/vlen=0/mvcc=0
    hfile.LEN_OF_BIGGEST_CELL = \x00\x00\x00\x00\x00\x00\x00+
Mid-key: Optional[1001/info:age/1732768745586/Put/seqid=0]
## 布隆过滤器的信息
Bloom filter:
    BloomSize: 2
    No of Keys in bloom: 1
    Max Keys for bloom: 1
    Percentage filled: 100%
    Number of chunks: 1
    Comparator: ByteArrayComparator
Delete Family Bloom filter:
    Not present

1.2 读流程

Alt text

  1. 创建连接同写流程。
  2. 创建Table对象发送get请求。
  3. 优先访问Block Cache,查找是否之前读取过,并且可以读取HFile的索引信息和布隆过滤器。
  4. 不管读缓存中是否已经有数据了(可能已经过期了),都需要再次读取写缓存和store中的文件(防止数据刚刚写入就开始读取)。
  5. 最终将所有读取到的数据合并版本(高版本的数据覆盖低版本的),按照get的要求返回即可。

1.3 合并读取数据优化

每次读取数据都需要读取三个位置(block cache\写缓存\hfile),最后进行版本的合并。效率会非常低,所有系统需要对此优化。

  1. HFile带有索引文件,读取对应RowKey数据会比较快。
  2. Block Cache会缓存之前读取的内容和元数据信息,如果HFile没有发生变化(记录在HFile尾信息中),则不需要再次读取。
  3. 不管读缓存中是否已经有数据了(可能已经过期了),都需要再次读取写缓存和store中的文件。
  4. 最终将所有读取到的数据合并版本,按照get的要求返回即可。

2. 写流程

Alt text

  1. 写流程顺序正如API编写顺序,首先创建HBase的重量级连接。
  2. 首先访问zookeeper,获取hbase:meta表位于哪个Region Server;
  3. 访问对应的Region Server,获取hbase:meta表,将其缓存到连接中,作为连接属性MetaCache,由于Meta表格具有一定的数据量,导致了创建连接比较慢;
  4. 之后使用创建的连接获取Table,这是一个轻量级的连接,只有在第一次创建的时候会检查表格是否存在访问RegionServer,之后在获取Table时不会访问RegionServer;
  5. 将数据顺序写入(追加)到 WAL,此处写入是直接落盘的,并设置专门的线程控制WAL预写日志的滚动(类似 Flume);
  6. 根据写入命令的RowKey和ColumnFamily查看具体写入到哪个MemStore,并且在MemStore中排序;
  7. 向客户端发送ack;
  8. 等达到MemStore的刷写时机后,将数据刷写到对应的store中。

3. MemStore Flush

3.1 刷写机制

MemStore刷写由多个线程控制,条件互相独立;主要的刷写规则是控制刷写文件的大小,在每一个刷写线程中都会进行监控。

  1. 当某个memstroe的大小达到了hbase.hregion.memstore.flush.size(默认值128M,和HDFS的块大小一致),其所在region的所有memstore都会刷写。当memstore的大小达到:
    hbase.hregion.memstore.flush.size(默认值128M) * hbase.hregion.memstore.block.multiplier(默认值4)时,会刷写同时阻止继续往该memstore写数据(由于线程监控是周期性的,所有有可能面对数据洪峰,尽管可能性比较小)。
  2. 由HRegionServer中的属性MemStoreFlusher内部线程FlushHandler控制。标准为LOWER_MARK(低水位线)和HIGH_MARK(高水位线),意义在于避免写缓存使用过多的内存造成OOM。当region server中memstore的总大小达到低水位线:
    java_heapsize * hbase.regionserver.global.memstore.size(默认值0.4) * hbase.regionserver.global.memstore.size.lower.limit(默认值0.95)
    region会按照其所有memstore的大小顺序(由大到小)依次进行刷写。直到region server中所有memstore的总大小减小到上述值以下。
    当region server中memstore的总大小达到高水位线hbase.regionserver.global.memstore.size(默认值0.4),会同时阻止继续往所有的memstore写数据。
  3. 为了避免数据过长时间处于内存之中,到达自动刷写的时间,也会触发memstore flush。由HRegionServer的属性PeriodicMemStoreFlusher控制进行,由于重要性比较低,5min才会执行一次。自动刷新的时间间隔由该属性进行配置hbase.regionserver.optionalcacheflushinterval(默认1小时)
  4. 当WAL文件的数量超过hbase.regionserver.max.logs,region会按照时间顺序依次进行刷写,直到WAL文件数量减小到hbase.regionserver.max.log以下(该属性名已经废弃,现无需手动设置,最大值为32)。可以理解为wal文件在数据最终写入到table里面时就会被删除,基本上不会累积达到这个配置阈值,除非写入table很慢(那机器的性能很慢),导致wal文件堆积。

3.2 查看源码

打开org.apache.hadoop.hbase.regionserver.MemStoreFlusher类:
Alt text 进入FlushHandler类,它是一个线程类:

java
private class FlushHandler extends Thread {
    public void run() {
        // ReginServer只要启动,这里就是一直满足条件不断循环
        while (!server.isStopped() && running.get()) {
            FlushQueueEntry fqe = null;
            try {
                wakeupPending.set(false); // allow someone to wake us up again
                fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
                if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
                    // 判断低水位线
                    FlushType type = isAboveLowWaterMark();
                    // 类型有高水位线,低水位线和NORMAL
                    if (type != FlushType.NORMAL) {
                        if (!flushOneForGlobalPressure(type)) {
                            Thread.sleep(1000);
                            wakeUpIfBlocking();
                        }
                        // 唤醒刷写线程
                        wakeupFlushThread();
                    }
                    continue;
                }
                FlushRegionEntry fre = (FlushRegionEntry) fqe;
                // 如果刷完就break,退出while循环
                if (!flushRegion(fre)) {
                    break;
                }
            } catch (InterruptedException ex) {
                //......异常处理
            }
            if (server.isStopped()) {
                synchronized (regionsInQueue) {
                regionsInQueue.clear();
                flushQueue.clear();
            }
            wakeUpIfBlocking();
        }
        LOG.info(getName() + " exiting");
    }
    ......
}

另外ReginServer会周期的执行reclaimMemStoreMemory()方法:
Alt text