一、方式介绍
本次测试一种采用了四种方式进行了对比,分别是:1.在RDD内部调用java API。2、调用saveAsNewAPIHadoopDataset()接口。3、saveAsHadoopDataset()。4、BulkLoad方法。
测试使用的大数据版本如下(均为单机版):Hadoop2.7.4、Hbase1.0.2、Spark2.1.0
二、测试
本次测试采用10W条单一列簇单一字段固定值进行测试。
以下是测试结果:
1.JAVA API
10W条数据:1000ms、944ms
100w条数据:6308ms、6725ms
2.saveAsNewAPIHadoopDataset()接口
10W条数据:2585ms、3125ms
100w条数据:13833ms、14880ms
3.saveAsHadoopDataset()接口
10W条数据:2623ms、2596ms
100w条数据:14929ms、13753ms
4.BulkLoad方法(此方法是导入大量数据最好的选择!!!)
10W条数据:9351ms、9364ms
100w条数据:9342ms、9403ms
1000w条数据:9690ms、9609ms
三、代码
pom引用
org.apache.hbase hbase 1.2.6 org.apache.hbase hbase-client 1.0.2 org.apache.hbase hbase-server 1.0.2 1)javaAPI代码 ------------------------------------- org.apache.hbase hbase-common 1.0.2
package cn.piesat.app import java.text.DecimalFormat import java.util.{ArrayList, List, Random} import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client._ object SparkJavaApi { val ZOOKEEPER_ADDRESS = "hadoop01" val ZOOKEEPER_PORT = "2181" val df2: DecimalFormat = new DecimalFormat("00") def main(args: Array[String]) = { val tableName: String = "test01" val conn = getConn val admin = conn.getAdmin val putList = getPutList() if (!admin.tableExists(TableName.valueOf(tableName))) { createTable(admin, tableName, Array("cf")) } val start: Long = System.currentTimeMillis insertBatchData(conn,tableName,admin,putList) val end: Long = System.currentTimeMillis System.out.println("用时:" + (end - start)) } def getConn(): Connection = { val conf = HBaseConfiguration.create conf.set("hbase.zookeeper.quorum", ZOOKEEPER_ADDRESS) conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT) ConnectionFactory.createConnection(conf) } def insertBatchData(conn: Connection, tableName: String, admin: Admin, puts:List[Put]) = try { val tableNameObj = TableName.valueOf(tableName) if (admin.tableExists(tableNameObj)) { val table = conn.getTable(tableNameObj) table.put(puts) table.close() admin.close() } } catch { case e: Exception => e.printStackTrace() } def createTable(admin: Admin, tableName: String, colFamiles: Array[String]) = try { val tableNameObj = TableName.valueOf(tableName) if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(tableNameObj) for (colFamily <- colFamiles) { desc.addFamily(new HColumnDescriptor(colFamily)) } admin.createTable(desc) admin.close() } } catch { case e: Exception => e.printStackTrace() } def getPutList(): List[Put] = { val random: Random = new Random val putlist = new ArrayList[Put](); for (i <- 0 until 100000) { val rowkey: String = df2.format(random.nextInt(99)) + i val put: Put = new Put(rowkey.getBytes) put.add("cf".getBytes, "field".getBytes, "a".getBytes) putlist.add(put) } putlist } }
------------------------------------- 2)saveAsNewAPIHadoopDataset()接口 -------------------------------------
package cn.piesat.app import java.text.DecimalFormat import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase._ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer //10W用了2585ms //100W用了13833ms、14880ms object SparkToHbaseNewAPI { val tableName = "test01" val cf = "cf" val num=1000000 val df2 = new DecimalFormat("00000000") def main(args: Array[String]) = { val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) { val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => { (new ImmutableBytesWritable, x) }) val start = System.currentTimeMillis() data.saveAsNewAPIHadoopDataset(job.getConfiguration) val end = System.currentTimeMillis() println("入库用时:" + (end - start)) sc.stop() } def getSparkSession(): SparkSession = { SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() } }
-------------------------------------
3)saveAsHadoopDataset()接口
-------------------------------------
package cn.piesat.app import java.text.DecimalFormat import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object SparkToHbaseOldAPI { val tableName="test01" val cf="cf" val df2 = new DecimalFormat("00000000") val num=1000000 //10W用时2623ms、2596ms //100W用时14929ms、13753ms def main(args: Array[String]): Unit = { val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) jobConf.setOutputFormat(classOf[TableOutputFormat]) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) { val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => { (new ImmutableBytesWritable, x) }) val start=System.currentTimeMillis() data.saveAsHadoopDataset(jobConf) val end=System.currentTimeMillis() println("入库用时:"+(end-start)) sc.stop() } def getSparkSession(): SparkSession = { SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() } }
------------------------------------- 4)BulkLoad方法(需要事先准备好数据文件) ------------------------------------
package cn.piesat import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.{HTable, Table, _} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object SparkHbaseBulkload { def main(args: Array[String]) = { val sc = new SparkContext("local[4]", "appName") val columnFamily1 = "cf" val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "hadoop01") val source=sc.textFile("file:///E:/student.txt").map{ x=>{ val splited=x.split(",") val rowkey=splited(0) val cf=splited(1) val clomn=splited(2) val value=splited(3) (rowkey,cf,clomn,value) } } val rdd = source.map(x => { //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key //KeyValue的实例为value //rowkey val rowKey = x._1 val family = x._2 val colum = x._3 val value = x._4 (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes(family), Bytes.toBytes(colum), Bytes.toBytes(value))) }) //生成的HFile的临时保存路径 val stagingFolder = "hdfs://hadoop01:9000/data12" //将日志保存到指定目录 rdd.saveAsNewAPIHadoopFile(stagingFolder, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], conf) //此处运行完成之后,在stagingFolder会有我们生成的Hfile文件 //开始即那个HFile导入到Hbase,此处都是hbase的api操作 val load = new LoadIncrementalHFiles(conf) //hbase的表名 val tableName = "output_table" //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址 val conn = ConnectionFactory.createConnection(conf) //根据表名获取表 val table: Table = conn.getTable(TableName.valueOf(tableName)) try { //创建一个hadoop的mapreduce的job val job = Job.getInstance(conf) //设置job名称 job.setJobName("DumpFile") //此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //输出文件的内容KeyValue job.setMapOutputValueClass(classOf[KeyValue]) //配置HFileOutputFormat2的信息 HFileOutputFormat2.configureIncrementalLoadMap(job, table) //开始导入 val start=System.currentTimeMillis() load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable]) val end=System.currentTimeMillis() println("用时:"+(end-start)+"毫秒!") } finally { table.close() conn.close() } } }
------------------------------------