Skip to content

Instantly share code, notes, and snippets.

@mohawk2
Last active December 11, 2017 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mohawk2/3bf6ea5047f1133ca9a9d47e271d6db5 to your computer and use it in GitHub Desktop.
Save mohawk2/3bf6ea5047f1133ca9a9d47e271d6db5 to your computer and use it in GitHub Desktop.
Mojolicious Advent Calendar entry: You Promised To Call!

Mojolicious Advent Calendar entry: You Promised To Call!

A new feature of Mojolicious, as of 7.49, is the implementation of the Promises/A+ specification. In this posting, we're going to use promises to implement non-blocking, parallel fetching of a number of web pages. For completeness, we'll also look at how to do it using the other approach supported by Mojolicious, "delays" (a continuation-passing style).

Background

The essence of non-blocking code is that when you're waiting for something, you tell the framework what you want to happen when that thing happens. It can then process other things in the meantime. This means you don't have lots of processes (or possibly threads) sitting there, just waiting for something else to finish; only the bare minimum of information is kept, about what to wait for, and what to do then.

Originally this was done just using callbacks, but this lead to what is known as "callback hell": each callback contains the next callback, at an increasing level of indentation. Even harder to keep track of is if the functions are kept separate. Avoiding this lead to the development of Promises, then Promises/A+. Continuation-passing has been around for a while.

Delays are used to mimic something like a procedural style, with explicit control of what steps are remaining:

# Use a delay to keep the event loop running until we are done
my $delay = Mojo::IOLoop->delay;
my $fetch;
$fetch = sub {
  # Stop if there are no more URLs
  return unless my $url = shift @urls;
  # Fetch the next title
  my $end = $delay->begin;
  $ua->get($url => sub {
    my ($ua, $tx) = @_;
    say "$url: ", $tx->result->dom->at('title')->text;
    $fetch->();
    $end->();
  });
};

# Process two requests at a time
$fetch->() for 1 .. 2;
$delay->wait;

Promises are used to easily add processing steps to a transaction: one can keep adding code for what to do "then" - after a previous stage has finished. Best of all, each "callback" is small and separate, with each one placed in succession. The process reads like sequential, synchronous code, even though it runs asynchronously:

my $delay = Mojo::IOLoop->delay;
my $fetch;
$fetch = sub {
  # Stop if there are no more URLs
  return unless my $url = shift @urls;
  # Fetch the next title
  $ua->get_p($url)->then(sub {
    my ($tx) = @_;
    say "$url: ", $tx->result->dom->at('title')->text;
    $fetch->(); # returns a promise
  });
};

# Process two requests at a time
my @promises = $fetch->() for 1 .. 2;
Mojo::Promise->all(@promises)->wait if @promises;

Specifics

The Mojolicious Cookbook shows how to implement single non-blocking requests with promises and with delays. The delays version even has a proto-spider, which can add to its queue of URLs to process. But what we want is slightly different: to use promises, and to operate on a fixed list of URLs.

Delays have the steps method to add steps to a given "delay". Given that a Promise is a single chain of processing steps, how are we going to have a number of them running concurrently, without making all the requests at once? We'll use two ideas: chaining (shown above - the key is each "then" returns a new Promise), and Mojo::Promise->all (also shown above) - it will wait until all the promises it's given are finished. Combining them gives us multiple streams of concurrent, but sequenced, activity.

Another option for dealing with a number of concurrent activities, if you just want the first one that completes, is race.

The task at hand

We have to synchronise the work between the multiple "chains" of execution, so that nothing gets missed, or done twice. Luckily, in the asynchronous but single-threaded context we have here, we can just pass around a reference to a single "queue", a Perl array. Let's build that array, at the start of our script:

#!/usr/bin/env perl

# cut down from https://stackoverflow.com/questions/15152633/perl-mojo-and-json-for-simultaneous-requests/15166898#15166898
sub usage { die "Usage: bulkget-delay urlbase outdir suffixesfile\n", @_ };
# each line of suffixesfile is a suffix
# it gets appended to urlbase, then requested non-blocking
# output in outdir with suffix as filename

use Mojo::Base -strict;
use Mojo::UserAgent;
use Mojo::Promise;
use Mojo::File 'path';

my $MAXREQ = 20;

my ($urlbase, $outdir, $suffixesfile) = @ARGV;
usage "No URL" if !$urlbase;
usage "$outdir: $!" if ! -d $outdir;
usage "$suffixesfile: $!" if ! -f $suffixesfile;

my $outpath = path($outdir);
my @suffixes = getsuffixes($suffixesfile, $outpath);
my $ua = Mojo::UserAgent->new;

sub getsuffixes {
  my ($suffixesfile, $outpath) = @_;
  open my $fh, '<', $suffixesfile or die $!;
  grep { !-f $outpath->child($_); } map { chomp; $_ } <$fh>;
}

We also want a procedure to handle results that are ready, to store them in a file if successful:

sub handle_result {
  my ($outpath, $tx, $s) = @_;
  if ($tx->res->is_success) {
    print "got $s\n";
    $outpath->child($s)->spurt($tx->res->body);
  } else {
    print "error $s\n";
  }
}

And now, the Promise version:

my @promises = map makepromise($urlbase, $ua, \@suffixes, $outpath), (1..$MAXREQ);
Mojo::Promise->all(@promises)->wait if @promises;

sub makepromise {
  my ($urlbase, $ua, $suffixes, $outpath) = @_;
  my $s = shift @$suffixes;
  return if !defined $s;
  my $url = $urlbase . $s;
  print "getting $url\n";
  $ua->get_p($url)->then(sub {
    my ($tx) = @_;
    handle_result($outpath, $tx, $s);
    makepromise($urlbase, $ua, $suffixes, $outpath);
  });
}

And the delay version:

start_urls($urlbase, $ua, \@suffixes, \&get_callback, $outpath);
$delay->wait;

sub start_urls {
  my ($urlbase, $ua, $queue, $cb, $outpath) = @_;
  state $idle = $MAXREQ;
  state $delay = Mojo::IOLoop->delay(
    sub { print "Loop ended before queue depleted\n" if @$queue }
  );
  while ( $idle and my $s = shift @$queue ) {
    $idle--;
    my $url = $urlbase . $s;
    print "getting $url\n";
    my $end = $delay->begin;
    $ua->get($url => sub{
      my ($ua, $tx) = @_;
      $idle++;
      handle_result($outpath, $tx, $s);
      start_urls($urlbase, $ua, $queue, $cb, $outpath);
      $end->();
    });
  }
}

Once either version runs out of suffixes to process, it will finish. The delay version is structured so it could more easily widen out its streams of activity, which is trickier for a Promise. Instead, to achieve this with a Promise would be to restructure both versions so that they are subscribed to a queue, and if the queue is empty, to wait until it is not. That's absolutely idiomatic for Promises, but we'll look at that another time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment