在前面:
scala:2.12
hbase:2.0.2
开发工具:IDEA
准备工作:
1、将生产上的hbase中的conf/hbase-site.xml文件拷贝到idea中的src/resources目录下
2、将生产环境中hbase中的$HBASE_HOME/lib下的*.jar文件加载到IDEA中
3、点击libraries->中间的"+" ->java
4、选择jar包所放的位置,点击OK
5、继续点击ok即可
6、进行连接代码编写:
package spark._coreimport java.io.IOExceptionimport java.util.UUIDimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase._import org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.filter.SingleColumnValueFilterimport org.apache.hadoop.hbase.filter.SubstringComparatorimport org.apache.hadoop.hbase.util.Bytesimport parquet.org.slf4j.LoggerFactory/** * Author Mr. Guo * Create 2018/11/5 - 19:08 */object Operator_Hbase { def LOG = LoggerFactory.getLogger(getClass) def getHbaseConf: Configuration = { val conf: Configuration = HBaseConfiguration.create conf.addResource(".\\main\\resources\\hbase-site.xml") conf.set("hbase.zookeeper.property.clientPort","2181") /*conf.set("spark.executor.memory","3000m") conf.set("hbase.zookeeper.quorum","master,slave1,slave2") conf.set("hbase.master","master:60000") conf.set("hbase.rootdir","Contant.HBASE_ROOTDIR")*/ conf } //创建一张表 @throws(classOf[MasterNotRunningException]) @throws(classOf[ZooKeeperConnectionException]) @throws(classOf[IOException]) def createTable(hbaseconn: Connection, tableName: String, columnFamilys: Array[String]) = { //建立一个数据库操作对象 var admin: Admin = hbaseconn.getAdmin; var myTableName: TableName = TableName.valueOf(tableName) if (admin.tableExists(myTableName)) { LOG.info(tableName + "Table exists!") } else { val tableDesc: HTableDescriptor = new HTableDescriptor(myTableName) tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation") for (columnFamily <- columnFamilys) { val columnDesc: HColumnDescriptor = new HColumnDescriptor(columnFamily) tableDesc.addFamily(columnDesc) } admin.createTable(tableDesc) LOG.info(tableName + "create table success!") } admin.close() } //载入数据 def addRow(table: Table, rowKey: String, columnFamily: String, quorm: String, value: String) = { val rowPut: Put = new Put(Bytes.toBytes(rowKey)) if (value == null) { rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, "".getBytes()) } else { rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, value.getBytes) } table.put(rowPut) } //获取数据 def getRow(table: Table, rowKey: String): Result = { val get: Get = new Get(Bytes.toBytes(rowKey)) val result: Result = table.get(get) for (rowKv <- result.rawCells()) { println("Famiily:" + new String(rowKv.getFamilyArray, rowKv.getFamilyOffset, rowKv.getFamilyLength, "UTF-8")) println("Qualifier:" + new String(rowKv.getQualifierArray, rowKv.getQualifierOffset, rowKv.getQualifierLength, "UTF-8")) println("TimeStamp:" + rowKv.getTimestamp) println("rowkey:" + new String(rowKv.getRowArray, rowKv.getRowOffset, rowKv.getRowLength, "UTF-8")) println("Value:" + new String(rowKv.getValueArray, rowKv.getValueOffset, rowKv.getValueLength, "UTF-8")) } return result } //批量添加数据 def addDataBatch(table: Table, list: java.util.List[Put]) = { try { table.put(list) } catch { case e: RetriesExhaustedWithDetailsException => { LOG.error(e.getMessage) } case e: IOException => { LOG.error(e.getMessage) } } } //查询全部 def queryAll(table: Table): ResultScanner = { val scan: Scan = new Scan try { val s = new Scan() val result: ResultScanner = table.getScanner(s) return result } catch { case e: IOException => { LOG.error(e.toString) } } return null } //查询条记录 def queryBySingleColumn(table: Table, queryColumn: String, value: String, columns: Array[String]): ResultScanner = { if (columns == null || queryColumn == null || value == null) { return null } try { val filter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn), Bytes.toBytes(queryColumn),CompareOp.EQUAL,new SubstringComparator(value)) val scan: Scan = new Scan() for (columnName <- columns) { scan.addColumn(Bytes.toBytes(columnName), Bytes.toBytes(columnName)) } scan.setFilter(filter) return table.getScanner(scan) } catch { case e: Exception => { LOG.error(e.toString) } } return null } //删除表 def dropTable(hbaseconn: Connection, tableName: String) = { try { val admin: HBaseAdmin = hbaseconn.getAdmin.asInstanceOf[HBaseAdmin] admin.disableTable(TableName.valueOf(tableName)) admin.deleteTable(TableName.valueOf(tableName)) } catch { case e: MasterNotRunningException => { LOG.error(e.toString) } case e: ZooKeeperConnectionException => { LOG.error(e.toString) } case e: IOException => { LOG.error(e.toString) } } } def main(args: Array[String]): Unit = { val conf: Configuration = getHbaseConf val conn = ConnectionFactory.createConnection(conf) //定义表名称 val table:Table = conn.getTable(TableName.valueOf("test")) try { //列族fam1,fam2 val familyColumn:Array[String] = Array[String]("info1","info2") //建表// createTable(conn,"test",familyColumn) val uuid:UUID = UUID.randomUUID() val s_uuid:String = uuid.toString //载入数据// addRow(table,s_uuid,"info","column1A",s_uuid+"_1A") //获取表中所有数据// getRow(table,"9ec78ac4-6042-4c34-8862-f5aca3e") //删除表// dropTable(conn,"test") }catch{ case e:Exception => { if (e.getClass == classOf[MasterNotRunningException]){ System.out.println("MasterNotRunningException") } if (e.getClass == classOf[ZooKeeperConnectionException]){ System.out.println("ZooKeeperConnectionException") } if (e.getClass == classOf[IOException]){ System.out.println("IOException") } e.printStackTrace() } }finally{ if (null != table){ table.close() } } }}
7、实际操作的过程中可能会遇到如下问题:
解决方案:
双击打开该文件,找到标签 <component name="PropertiesComponent"> , 在标签里加一行 <property name="dynamic.classpath" value="true" />
保存即可