Skip to content

API操作HBase

1. 环境准备

  1. 新建maven项目hbase-demo,引入依赖:
xml
<dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.6.1-hadoop3</version>
    </dependency>
</dependencies>
  1. 在resources目录下面创建log4j.properties文件
ini
log4j.rootLogger=INFO, stdout, D

# Console Appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= %d{hh:mm:ss,SSS} [%t] %-5p %c %x - %m%n

# Custom tweaks
log4j.logger.org.apache=ERROR
log4j.logger.org.apache.spark=ERROR
log4j.logger.org.sparkproject.jetty=ERROR

# log file
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = log/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = ERROR
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

2. 创建连接

根据官方API介绍,HBase的客户端连接由类来创建,用户使用完成ConnectionFactory之后需要手动关闭连接。同时连接是一个重量级的,推荐一个进程使用一个连接,对HBase的命令通过连接中的两个属性Admin和Table来实现。

2.1 单线程连接

sh
public class App {
    public static void main( String[] args ) throws IOException {
        // 1. 创建连接配置对象
        Configuration conf = new Configuration();
        // 2. 添加配置参数
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        // 3. 创建连接
        // 默认创建的同步连接
        Connection connection = ConnectionFactory.createConnection(conf);
        // 不建议使用异步连接
        CompletableFuture<AsyncConnection> asyncConn = ConnectionFactory.createAsyncConnection(conf);
        // 4. 使用连接
        System.out.println(connection);
        // 5. 关闭连接
        connection.close();
    }
}

2.2 多线程连接

从服务器上面下载hbase-site.xml文件,放到工程resource目录下,配置如下:

xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
    </property>
</configuration>

由于hbase的连接是一个比较重的对象,hbase官方不建议每次操作hbase都去创建连接,hbase的连接对象是线程安全的,可以在多线程环境中共同使用。

sh
public class HBaseConnection {
    static Connection connection = null;
    static {
        try {
            connection = ConnectionFactory.createConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    // 主进程退出时调用
    private static void closeConn() throws IOException {
        if(connection!=null){
            connection.close();
        }
    }
}

为何不再使用Configuration对象进行创建连接对象了呢?查询源码发现:
在createConnection()方法里面会调用create()方法 Alt text
点击进入create()方法:
Alt text 点击进入addHbaseResources()方法,发现会读取hbase-site.xml配置文件,所以只需要在resource目录下创建名为hbase-site.xml的配置文件,就会被hbase客户端自动读取。

3. 创建命令空间

java
public class HBaseDML {
    static Logger logger = LoggerFactory.getLogger(HBaseDML.class);
    static Connection conn = HBaseConnection.connection;
    public static void createNamespace(String nameSpace) throws IOException {
        // 获取Admin对象, Admin对象不是线程安全的,不能放入缓存或者池化
        // Admin对象主要用来管理命令空间、表
        Admin admin = conn.getAdmin();
        try {
            admin.createNamespace(NamespaceDescriptor.create(nameSpace).build());
        }catch (Exception e){
            logger.error("创建命名空间报错", e);
        }
        admin.close();
    }

    public static void main(String[] args) throws IOException {
        createNamespace("bigdata");
        HBaseConnection.closeConn();
    }
}

登录服务器验证查看命名空间:

sh
hbase:003:0> list_namespace
NAMESPACE                                                                             
bigdata                                                                        
default                                                           
hbase        
ns1                                                                        
4 row(s)
Took 0.1040 seconds

4. 判断表是否存在

scala
package org.example

import org.apache.hadoop.hbase.TableName
import org.slf4j.LoggerFactory

object HBaseDML2{

  val logger = LoggerFactory.getLogger(HBaseDML2.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(tableName:String, nameSpace:String):Boolean={
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    var flag: Boolean = false
    try {
      flag = isExistTable("ns1", "person")
    }catch {
      case e : Exception=>
        logger.error("存在判断报错", e)
    }
    println(flag)
    HBaseConnection.closeConn();
  }
}

4. 创建表

scala
package org.example

import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, TableDescriptorBuilder}
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.LoggerFactory

object HBaseDML3{

  val logger = LoggerFactory.getLogger(HBaseDML3.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace:String, tableName:String):Boolean={
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }


  def main(args: Array[String]): Unit = {
    try {
      if(isExistTable("bigdata", "user")){
        println("已存在表")
      }else{
        // 表不存在,创建表
        val result = createTable("bigdata", "user", 5, "info", "msg")
        println(result)
      }
    }catch {
      case e : Exception=>
        logger.error("存在判断报错", e)
    }

    HBaseConnection.closeConn();
  }

  private def createTable(namespace: String, tableName: String, maxVersion: Int, columnNameArray:String*) = {
    if(columnNameArray.isEmpty){
      "表字段缺失"
    }else{
        val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName))
        // 遍历创建列族
            for (columnName <- columnNameArray) {
                // 创建列族描述
                val familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnName)).setMaxVersions(maxVersion).build()
                tableBuilder.setColumnFamily(familyDescriptor)
            }
            val admin = connection.getAdmin
            admin.createTable(tableBuilder.build())
            admin.close()
            "创建成功"
        }
    }
}

查看结果: Alt text

5. 修改表

scala
object HBaseDML4 {

  val logger = LoggerFactory.getLogger(HBaseDML4.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        val result = modifyTableVersion("bigdata", "user", "info", 6)
        println(result)
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  private def modifyTableVersion(namespace: String, tableName: String, columnNameFamily: String, version: Int) = {
    if (version < 1) {
      "版本不能小于1"
    } else {
      val admin = connection.getAdmin
      // 修改表不能再通过newBuilder(tableName)方法,它会创建一个新的表格构建器,没有hbase对应表中的信息
      // val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName))
      // 获取表的描述
      val tableDescriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName))
      val tableBuilder = TableDescriptorBuilder.newBuilder(tableDescriptor)
      // newBuilder(byte)会创建一个新的列族描述
      // val familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnNameFamily))
      // 应该用从hbase读取过来的现有的列族描述
      val familyDescriptor = tableDescriptor.getColumnFamily(Bytes.toBytes(columnNameFamily))
      val newFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(familyDescriptor)
        .setMaxVersions(version)
        .build()
      // tableBuilder.setColumnFamily()会将表格描述中所有的列族描述覆盖
      tableBuilder.modifyColumnFamily(newFamilyDescriptor)

      admin.modifyTable(tableBuilder.build())
      // 关闭admin
      admin.close()
      "创建成功"
    }
  }
}

Alt text

6. 删除表

原生API删除表格是需要两步,下面代码封装后只需要执行一步,但是实际工作中一般不会封装删除表的方法,只是演示,在大数据场景中,误删除表格会造成很大的数据损失。

scala
import org.apache.hadoop.hbase.TableName
import org.slf4j.LoggerFactory

object HBaseDML5 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        val result = deleteTable("bigdata", "user")
        println(result)
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  private def deleteTable(namespace: String, tableName: String) = {
    val admin = connection.getAdmin
    val table = TableName.valueOf(namespace, tableName)
    // 删除表格之前需要标记表不可用
    admin.disableTable(table)
    admin.deleteTable(table)
    // 关闭admin
    admin.close()
    "删除成功"
  }
}

7. 写入数据

scala
object HBaseDDL1 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        putCell("bigdata", "user","1001", "info", "name", "张三")
        putCell("bigdata", "user","1001", "info", "age", "28")
        println("插入完成")
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  /**
   * @param namespace 命名空间
   * @param tableName 表名称
   * @param rowKey 主键
   * @param columFamily  列族
   * @param columnName  字段名称
   * @param data  字段值
   */
  private def putCell(namespace: String, tableName: String, rowKey: String, columFamily: String, columnName: String, data:String) = {
    val table = connection.getTable(TableName.valueOf(namespace, tableName))
    // 创建put对象
    val put = new client.Put(Bytes.toBytes(rowKey))
    // 给put添加数据
    put.addColumn(Bytes.toBytes(columFamily), Bytes.toBytes(columnName), Bytes.toBytes(data))
    // 将数据写入
    table.put(put)
    table.close()
  }
}

执行完毕后查看:
Alt text

8. 读取数据

8.1 读取单列数据

scala
object HBaseDDL2 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        getCells("bigdata", "user","1001", "info", "name")
        println("读取完成")
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  /**
   * @param namespace 命名空间
   * @param tableName 表名称
   * @param rowKey 主键
   * @param columFamily  列族
   * @param columnName  字段名称
   */
  private def getCells(namespace: String, tableName: String, rowKey: String, columFamily: String, columnName: String) = {
    val table = connection.getTable(TableName.valueOf(namespace, tableName))
    // 创建get对象
    val get = new Get(Bytes.toBytes(rowKey))
    // 给put添加数据, addColumn读取某一列数据
    get.addColumn(Bytes.toBytes(columFamily), Bytes.toBytes(columnName))
    // 读取所有版本
    get.readAllVersions()
    // 获取数据
    val result = table.get(get)
    val cells = result.rawCells()
    // 如果是实际开发,需要在业务方法中处理数据
    for(cell <- cells){
      println(new String(CellUtil.cloneValue(cell)))
    }
    table.close()
  }
}

执行结果: Alt text

8.2 扫描数据

前面的方法一次只能读取一行数据,如果需要读取多行数据需要使用scan的api:

scala
object HBaseDDL3 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        scanRows("bigdata", "user","1001", "1004")
        println("读取完成")
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  /**
   * @param namespace 命名空间
   * @param tableName 表名称
   * @param startRow 主键开始
   * @param endRow  主键结束
   */
  private def scanRows(namespace: String, tableName: String, startRow: String, endRow: String) = {
    val table = connection.getTable(TableName.valueOf(namespace, tableName))
    // 创建scan对象, 默认查询所有,一般要限制行数
    val scan = new Scan()
    // 设置起止行,左闭右开
    scan.withStartRow(Bytes.toBytes(startRow))
    scan.withStopRow(Bytes.toBytes(endRow))
    // 获得scanner
    val scanner = table.getScanner(scan)
    scanner.forEach(result=>{
      val cells = result.rawCells()
      for(cell <- cells){
        val rowStr = new String(CellUtil.cloneRow(cell))+"\t"+
          new String(CellUtil.cloneFamily(cell))+"\t"+
          new String(CellUtil.cloneQualifier(cell))+"\t"+
          new String(CellUtil.cloneValue(cell))
        println(rowStr)
      }
    })
    table.close()
  }
}

执行结果: Alt text

8.3 带过滤扫描

过滤器有两种,第一种过滤后只保留当前列数据, 第二种过滤后保留当前行数据。

scala
object HBaseDDL4 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        filterScanRows("bigdata", "user","1001","1004", "info", "name", "张三2")
        println("读取完成")
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  /**
   * @param namespace 命名空间
   * @param tableName 表名称
   * @param startRow 主键
   * @param endRow  列族
   */
  private def filterScanRows(namespace: String, tableName: String, startRow: String, endRow: String, columnFamily:String, columnName:String, value:String) = {
    val table = connection.getTable(TableName.valueOf(namespace, tableName))
    // 创建scan对象, 默认查询所有,一般要限制行数
    val scan = new Scan()
    // 设置起止行,左闭右开
    scan.withStartRow(Bytes.toBytes(startRow))
    scan.withStopRow(Bytes.toBytes(endRow))

    val filter = new ColumnValueFilter(
      // 列族名称
      Bytes.toBytes(columnFamily),
      // 列名称
      Bytes.toBytes(columnName),
      // 比较关系
      CompareOperator.EQUAL,
      Bytes.toBytes(value),
    )
    scan.setFilter(filter)

    // 获得scanner
    val scanner = table.getScanner(scan)
    scanner.forEach(result=>{
      val cells = result.rawCells()
      for(cell <- cells){
        val rowStr = new String(CellUtil.cloneRow(cell))+"\t"+
          new String(CellUtil.cloneFamily(cell))+"\t"+
          new String(CellUtil.cloneQualifier(cell))+"\t"+
          new String(CellUtil.cloneValue(cell))
        println(rowStr)
      }
    })
    table.close()
  }
}

执行结果: Alt text 发现里面rowkey为1002也被读了出来,查看user所有数据:
Alt text 发现这条数据并没有column叫name的,原因在于hbase查询过滤的时候,如果指定字段的值没有也会被查询出来。可以在自己的代码中进行二次过滤。

9. 删除数据

删除数据只能按照rowKey和列进行删除

scala
import org.apache.hadoop.hbase.client.{Delete}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName}
import org.slf4j.LoggerFactory

object HBaseDDL6 {

  val logger = LoggerFactory.getLogger(HBaseDML5.getClass)
  val connection = HBaseConnection.connection

  private def isExistTable(nameSpace: String, tableName: String): Boolean = {
    val admin = connection.getAdmin
    val flag = admin.tableExists(TableName.valueOf(nameSpace, tableName))
    admin.close()
    flag
  }

  def main(args: Array[String]): Unit = {
    try {
      if (isExistTable("bigdata", "user")) {
        // 表存在
        HBaseDDL2.getCells("bigdata", "user","1002", "info", "age")
        deleteColumn("bigdata", "user","1002", "info", "age")
        println("读取完成")
      } else {
        println("不存在表")
      }
    } catch {
      case e: Exception =>
        logger.error("存在判断报错", e)
    }
    HBaseConnection.closeConn();
  }

  /**
   * @param namespace 命名空间
   * @param tableName 表名称
   * @param rowKey 主键
   * @param columnFamily 列族
   * @param columnName  列名
   */
  private def deleteColumn(namespace: String, tableName: String, rowKey:String, columnFamily:String, columnName:String) = {
    val table = connection.getTable(TableName.valueOf(namespace, tableName))
    val delete = new Delete(Bytes.toBytes(rowKey))
    // 只删除最新版本的数据
    // delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName))
    // 相当于命令行deleteall, 删除所有key对应的版本数据
    delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName))
    table.delete(delete)
    table.close()
  }
}

提示

在开发中,实际很少会接触底层API, 会有人将底层封装好提供jar给我们进行业务开发