Skip to content

Instantly share code, notes, and snippets.

@clintongormley
Created August 6, 2012 12:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clintongormley/3274257 to your computer and use it in GitHub Desktop.
Save clintongormley/3274257 to your computer and use it in GitHub Desktop.
ScrolledSearchAsync.pm
package ElasticSearch::ScrolledSearchAsync;
use strict;
use warnings FATAL => 'all', NONFATAL => 'redefine';
use Scalar::Util qw(weaken);
use Devel::GlobalDestruction;
use ElasticSearch::Util qw(parse_params);
#===================================
sub new {
#===================================
my $class = shift;
my ( $es, $params ) = parse_params(@_);
my $scroll = $params->{scroll} ||= '1m';
my $method = $params->{q} ? 'searchqs' : 'search';
my $as_json = delete $params->{as_json};
my $results_cv = $es->$method($params);
my $weak = my $self = bless {
_es => $es,
_results_cv => $results_cv,
_scroll => $scroll,
_eof => 0,
_as_json => $as_json,
_buffer => [],
_queue => [],
}, $class;
weaken($weak);
$results_cv->cb(
sub {
if ( my $error = $@ ) {
return $weak->_croak_queue($error);
}
die $@ if $@;
my $results = shift;
$weak->{_scroll_id} = $results->{_scroll_id};
$weak->{_total} = $results->{hits}{total};
$weak->{_buffer} = $results->{hits}{hits};
$weak->{_max_score} = $results->{hits}{max_score};
$weak->{_facets} = $results->{facets};
$weak->_drain_queue;
}
);
return $self;
}
#===================================
sub DESTROY {
#===================================
my $self = shift;
return if in_global_destruction;
$self->_croak_queue( ref($self) . " object out of scope" );
$self->{_results_cv}->clear_guard;
}
#===================================
sub next {
#===================================
my $self = shift;
my $size = shift || 1;
my $cb = sub {
while ( @{ $self->{_buffer} } < $size && !$self->{_eof} ) {
$self->refill_buffer;
AnyEvent->_poll;
}
my @results = splice @{ $self->{_buffer} }, 0, $size;
return $self->{_as_json}
? $self->{_es}->transport->JSON->encode( \@results )
: $size == 1 ? $results[0]
: @results;
};
$self->_enqueue($cb);
}
#===================================
sub drain_buffer {
#===================================
my $self = shift;
if ( my $size = @{ $self->{_buffer} } ) {
return $self->next($size);
}
my $cv = $self->es->transport->cv;
$cv->send( $self->{_as_json} ? '[]' : () );
return $cv;
}
#===================================
sub refill_buffer {
#===================================
my $self = shift;
if ( $self->{_results_cv}->ready && !$self->{_eof} ) {
my $cv = $self->{_results_cv} = $self->{_es}->scroll(
scroll => $self->{_scroll},
scroll_id => $self->{_scroll_id}
);
$cv->cb(
sub {
if ( my $error = $@ ) {
return $self->_croak_queue($error);
}
my $results = shift;
my @hits = @{ $results->{hits}{hits} };
$self->{_eof}++ if @hits == 0;
$self->{_scroll_id} = $results->{_scroll_id};
push @{ $self->{_buffer} }, @hits;
$self->_drain_queue;
}
);
}
$self->_enqueue( sub { scalar @{ $self->{_buffer} } } );
}
#===================================
sub total {
#===================================
shift->_enqueue( sub { shift->{_total} } );
}
#===================================
sub max_score {
#===================================
shift->_enqueue( sub { shift->{_max_score} } );
}
#===================================
sub eof {
#===================================
shift->_enqueue( sub { shift->{_eof} } );
}
#===================================
sub es { shift->{_es} }
#===================================
#===================================
sub facets {
#===================================
my $self = shift;
$self->_enqueue(
sub {
return $self->{_as_json}
? $self->{_es}
->transport->JSON->encode( $self->{_facets} || {} )
: $self->{_facets}
}
);
}
#===================================
sub _drain_queue {
#===================================
my $self = (@_);
while ( @{ $self->{_queue} } ) {
return unless $self->{_results_cv}->ready;
my ( $cv, $cb ) = @{ shift @{ $self->{_queue} } };
$cv->send( $cb->($self) ) if $cv;
}
}
#===================================
sub _croak_queue {
#===================================
my ( $self, $error ) = @_;
while ( my $next = shift @{ $self->{_queue} } ) {
my ( $cv, $cb ) = @$next;
$cv->croak($error) if $cv;
}
}
#===================================
sub _enqueue {
#===================================
my ( $self, $cb ) = @_;
my $cv = $self->es->transport->cv;
push @{ $self->{_queue} }, [ $cv, $cb ];
Scalar::Util::weaken( $self->{_queue}[-1][0] );
$self->_drain_queue;
return $cv;
}
1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment