欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

python使用happybase批量的操作hbase

程序员文章站 2022-04-01 11:51:00
...

最近在跑任务的回溯,发现有时候,速度会很慢 ,一开始确定到了 是hbase就起了一个公用的thrift,大家最近也都在导数据,搞得thrift总是挂掉 。 后来在集群中的其他hbase上起了Thrift服务,每个业务用自己的接口,这样能稳定了不少。 既然问题已经到这里了,

最近在跑任务的回溯,发现有时候,速度会很慢 ,一开始确定到了 是hbase就起了一个公用的thrift,大家最近也都在导数据,搞得thrift总是挂掉 。 后来在集群中的其他hbase上起了Thrift服务,每个业务用自己的接口,这样能稳定了不少。

既然问题已经到这里了,就要想方设法介绍链接和频繁的获取数据,回溯任务会涉及到两个批量的任务,一个是hbase的获取网页信息的,另一个是把信息推送到

redis分析队列里面。 关于redis的长连接和批量操作已经完成,现在要改hbase的批量操作。

看了下happybase的文档, 一个是rows ,也就是数据的批量的get,还有一个是batch,这个是批量的操作,类似一个操作链条,相当于我把一系列的动作放到list里面,然后一次性的推到thrift执行。

from buzz.lib.hbase import hb
#xiaorui.cc
list = ['fffec611be1150a3c6ec47d16243170f',
 'fffec64d36e2afb9c801f533555e03d8',
 'ffffc8f782fc44d53a05a090b175f7f8',
 'ffffcd05483697128e426ac9a5882d4d',
 'ffffef6a5889cecfd67e49c4b0a0e3ab',
 'fffff003b4e8328a002a09140afdf662',
 'fffff0503298c2e8acfa2146f5028f76',
 'fffff09d26c34af9e9286b7cfd4354d6',
 'fffff0d15acad09af4392520cbb496a5',
 'fffff10b7c949bee275d6ee5f2c411c6',
 'fffff15c658b773719f6c6482c03c6fe',
 'fffff16d18f5fd8ae5a8dfe84ef43b63',
 'fffffb9285cb2b875276061bc808a23c',
 'fffffbb06b87214a6aad714e86d69d31',
 'fffffbb17d235d16ad041992699eba4b',
 'fffffbb495afa6cb5e9decd909ff4026',
 'fffffbc24f6db511617fb5a1905f1597',
 'fffffbcbe880e4cb270dd268e237fc96',
 'fffffc499edcbda7a38adf10840c3a6f',
 'fffffe66fe54cc66918fa59dd7914841',
 'fffffe9249c4c260277884fb5ece92ad',
 'fffffe9d05f6cd2d760270947085e970',
 'ffffff2b4089a09756bb85b181f9f718',
 'ffffffed28fd8493e9dbbe60a3123af3']
print len(list)
for i in list:
    print i
    hb.get(i, False)
print 'many get'
table = hb.get_table()
columns = ['bz:url', 'src:html']
row = table.rows(list, columns=columns)

happybase 不管是文档还是对于一些异常的处理都要比原生的thrift python api 要强的。

这里在放一个happybase的小demo ,供大家学习。

import happybase
connection = happybase.Connection('h11', compat='0.90')
connection.open()
print connection.tables()
''' connection.create_table(
    'mytable',
    {'cf1': dict(max_versions=10),
     'cf2': dict(max_versions=1, block_cache_enabled=False),
     'cf3': dict(),  # use defaults
    }
)'''
table = connection.table('test')
row = table.row('row1', columns=['data:1'])
print row['data:1']
for key, data in table.scan():
  print key, data
connection.close()