Created
August 22, 2011 20:13
-
-
Save kovyrin/1163402 to your computer and use it in GitHub Desktop.
Hbase Client Wrapper (scanner part)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#--------------------------------------------------------------------------------- | |
# Scanner-related methods | |
def self.open_scanner(table, params) | |
scanner = nil | |
scanner = client(table).scannerOpenWithPrefix(table, params[:prefix], params[:columns]) if params[:prefix] | |
scanner = client(table).scannerOpenWithStop(table, params[:start_id], params[:stop_id], params[:columns]) if params[:stop_id] | |
scanner ||= client(table).scannerOpen(table, params[:start_id], params[:columns]) | |
return scanner | |
end | |
# Scans a set of rows emitting one row at a time | |
def self.scan_rows(table, params = {}) | |
raise ArgumentError, "No block given" unless block_given? | |
return {} unless enable_reads?(table) | |
hbase = client(table) | |
# Default params | |
params = { | |
:start_id => '', # start from the beginning | |
:columns => [], # retrieve all the columns | |
:rows_per_batch => 1, # retrieve rows one by one | |
:limit => 0 # retrieve all rows | |
}.merge(params) | |
# Open a scanner | |
scanner = retryable(:times => 5, :on => Thrift::Exception) do | |
open_scanner(table, params) | |
end | |
limit = params[:limit] | |
loop do | |
# Get rows from the scanner | |
rows = retryable(:times => 5, :on => Thrift::Exception) do | |
hbase.scannerGetList(scanner, params[:rows_per_batch]) | |
end | |
# ... until it is empty | |
break if rows.empty? | |
# ... and feed them to the caller one at a time | |
rows.each do |row| | |
columns = columns_to_hash(row.columns) | |
yield(row.row, columns) | |
# If 0 was passed as limit, it will never be 0 again, so it means "no limit" | |
limit -= 1 | |
break if limit == 0 | |
end | |
# Stop retrieving rows if limit reached. | |
break if limit == 0 | |
end | |
ensure | |
# Close the scanner | |
if hbase && scanner | |
hbase.scannerClose(scanner) rescue nil | |
end | |
end | |
def self.columns_to_hash(columns) | |
return {} unless columns | |
result = {} | |
columns.each do |column, cell| | |
result[column] = cell.value | |
end | |
return result | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment