博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark入Hbase的四种方式效率对比
阅读量:5932 次
发布时间:2019-06-19

本文共 10875 字,大约阅读时间需要 36 分钟。

一、方式介绍

本次测试一种采用了四种方式进行了对比,分别是:1.在RDD内部调用java API。2、调用saveAsNewAPIHadoopDataset()接口。3、saveAsHadoopDataset()。4、BulkLoad方法。

测试使用的大数据版本如下(均为单机版):Hadoop2.7.4、Hbase1.0.2、Spark2.1.0

二、测试

本次测试采用10W条单一列簇单一字段固定值进行测试。

以下是测试结果:

1.JAVA API

  10W条数据:1000ms、944ms

  100w条数据:6308ms、6725ms

2.saveAsNewAPIHadoopDataset()接口

  10W条数据:2585ms、3125ms

  100w条数据:13833ms、14880ms

3.saveAsHadoopDataset()接口

       10W条数据:2623ms、2596ms

  100w条数据:14929ms、13753ms

4.BulkLoad方法(此方法是导入大量数据最好的选择!!!

    10W条数据:9351ms、9364ms

  100w条数据:9342ms、9403ms

       1000w条数据:9690ms、9609ms

三、代码

pom引用

org.apache.hbase
hbase
1.2.6
org.apache.hbase
hbase-client
1.0.2
org.apache.hbase
hbase-server
1.0.2
org.apache.hbase
hbase-common
1.0.2
1)javaAPI代码 -------------------------------------
package cn.piesat.app import java.text.DecimalFormat import java.util.{ArrayList, List, Random} import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client._ object SparkJavaApi {
val ZOOKEEPER_ADDRESS = "hadoop01" val ZOOKEEPER_PORT = "2181" val df2: DecimalFormat = new DecimalFormat("00") def main(args: Array[String]) = {
val tableName: String = "test01" val conn = getConn val admin = conn.getAdmin val putList = getPutList() if (!admin.tableExists(TableName.valueOf(tableName))) {
createTable(admin, tableName, Array("cf")) } val start: Long = System.currentTimeMillis insertBatchData(conn,tableName,admin,putList) val end: Long = System.currentTimeMillis System.out.println("用时:" + (end - start)) } def getConn(): Connection = {
val conf = HBaseConfiguration.create conf.set("hbase.zookeeper.quorum", ZOOKEEPER_ADDRESS) conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT) ConnectionFactory.createConnection(conf) } def insertBatchData(conn: Connection, tableName: String, admin: Admin, puts:List[Put]) = try {
val tableNameObj = TableName.valueOf(tableName) if (admin.tableExists(tableNameObj)) {
val table = conn.getTable(tableNameObj) table.put(puts) table.close() admin.close() } } catch {
case e: Exception => e.printStackTrace() } def createTable(admin: Admin, tableName: String, colFamiles: Array[String]) = try {
val tableNameObj = TableName.valueOf(tableName) if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(tableNameObj) for (colFamily <- colFamiles) {
desc.addFamily(new HColumnDescriptor(colFamily)) } admin.createTable(desc) admin.close() } } catch {
case e: Exception => e.printStackTrace() } def getPutList(): List[Put] = {
val random: Random = new Random val putlist = new ArrayList[Put](); for (i <- 0 until 100000) {
val rowkey: String = df2.format(random.nextInt(99)) + i val put: Put = new Put(rowkey.getBytes) put.add("cf".getBytes, "field".getBytes, "a".getBytes) putlist.add(put) } putlist } }
------------------------------------- 2)saveAsNewAPIHadoopDataset()接口 -------------------------------------
package cn.piesat.app import java.text.DecimalFormat import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase._ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer //10W用了2585ms //100W用了13833ms、14880ms object SparkToHbaseNewAPI {
val tableName = "test01" val cf = "cf" val num=1000000 val df2 = new DecimalFormat("00000000") def main(args: Array[String]) = {
val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) {
val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => {
(new ImmutableBytesWritable, x) }) val start = System.currentTimeMillis() data.saveAsNewAPIHadoopDataset(job.getConfiguration) val end = System.currentTimeMillis() println("入库用时:" + (end - start)) sc.stop() } def getSparkSession(): SparkSession = {
SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() } }
-------------------------------------
3)saveAsHadoopDataset()接口
-------------------------------------
package cn.piesat.app import java.text.DecimalFormat import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object SparkToHbaseOldAPI {
val tableName="test01" val cf="cf" val df2 = new DecimalFormat("00000000") val num=1000000 //10W用时2623ms、2596ms //100W用时14929ms、13753ms def main(args: Array[String]): Unit = {
val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) jobConf.setOutputFormat(classOf[TableOutputFormat]) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) {
val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => {
(new ImmutableBytesWritable, x) }) val start=System.currentTimeMillis() data.saveAsHadoopDataset(jobConf) val end=System.currentTimeMillis() println("入库用时:"+(end-start)) sc.stop() } def getSparkSession(): SparkSession = {
SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() } }
------------------------------------- 4)BulkLoad方法(需要事先准备好数据文件) ------------------------------------
package cn.piesat import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.{HTable, Table, _} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object SparkHbaseBulkload {
def main(args: Array[String]) = {
val sc = new SparkContext("local[4]", "appName") val columnFamily1 = "cf" val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "hadoop01") val source=sc.textFile("file:///E:/student.txt").map{
x=>{
val splited=x.split(",") val rowkey=splited(0) val cf=splited(1) val clomn=splited(2) val value=splited(3) (rowkey,cf,clomn,value) } } val rdd = source.map(x => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key //KeyValue的实例为value //rowkey val rowKey = x._1 val family = x._2 val colum = x._3 val value = x._4 (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes(family), Bytes.toBytes(colum), Bytes.toBytes(value))) }) //生成的HFile的临时保存路径 val stagingFolder = "hdfs://hadoop01:9000/data12" //将日志保存到指定目录 rdd.saveAsNewAPIHadoopFile(stagingFolder, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], conf) //此处运行完成之后,在stagingFolder会有我们生成的Hfile文件 //开始即那个HFile导入到Hbase,此处都是hbase的api操作 val load = new LoadIncrementalHFiles(conf) //hbase的表名 val tableName = "output_table" //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址 val conn = ConnectionFactory.createConnection(conf) //根据表名获取表 val table: Table = conn.getTable(TableName.valueOf(tableName)) try {
//创建一个hadoop的mapreduce的job val job = Job.getInstance(conf) //设置job名称 job.setJobName("DumpFile") //此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //输出文件的内容KeyValue job.setMapOutputValueClass(classOf[KeyValue]) //配置HFileOutputFormat2的信息 HFileOutputFormat2.configureIncrementalLoadMap(job, table) //开始导入 val start=System.currentTimeMillis() load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable]) val end=System.currentTimeMillis() println("用时:"+(end-start)+"毫秒!") } finally {
table.close() conn.close() } } }
------------------------------------

转载于:https://www.cnblogs.com/runnerjack/p/10480468.html

你可能感兴趣的文章
针对异常的微信支付开发 坚守两大原则(分享)
查看>>
ExtJs4发送同步请求的store
查看>>
恶意邮件假冒系统安全公告发送病毒,通过个人签名数字证书排除不明邮件干扰...
查看>>
linux内核编译
查看>>
实时股票数据接口 ZT
查看>>
Object-C - 类的定义
查看>>
小程序-动态设置顶部导航条
查看>>
Mybatis通过工具类根据用户名查找用户列表
查看>>
c++ inline 的位置不当导致的 无法解析的外部符号
查看>>
recvfrom WSAEFAULT 10014 的错误记录
查看>>
HLG 1460 Highway Construction【树的直径】
查看>>
hdu 3397 Sequence operation
查看>>
Marketplace Client安装
查看>>
【python】-- 递归函数、高阶函数、嵌套函数、匿名函数
查看>>
Ubuntu打开终端的方法三种
查看>>
php关闭浏览器不终止运行
查看>>
twig的 function 学习
查看>>
robotframework关于变量的引用
查看>>
6.MarkDown语法总结-复杂场景的使用
查看>>
namecheap 添加二级域名
查看>>