Skip to content

Instantly share code, notes, and snippets.

@sangheestyle
Forked from jooyunghan/gist:7497419caaca7e579357
Last active August 29, 2015 14:17
Show Gist options
  • Save sangheestyle/f57e1a51e4a0f3b96f80 to your computer and use it in GitHub Desktop.
Save sangheestyle/f57e1a51e4a0f3b96f80 to your computer and use it in GitHub Desktop.
var _ = require('lodash');
var github = require('octonode');
var Rx = require('rx');
// repos :: Search -> Options -> Observable Data
// callback style is converted to Promise and then to Observable
function repos(search, options) {
return Rx.Observable.fromPromise(new Promise(function (resolve, reject) {
search.repos(options, function (err, body, header) {
if (err)
reject(err);
else
resolve({body: body, header: header});
});
}));
}
// nextPage :: Options {page: Num} -> Options {page: Num}
function nextPage(options) {
options = _.clone(options);
options.page = options.page ? options.page + 1 : 2;
return options;
}
// parseLink :: String -> Link {<Rel>:<Url>}
function parseLink(text) {
var links = _.map(text.split(','), function (part) {
var match = part.trim().match(/<(.*)>; rel=\"(.*)\"/);
return [ match[2], match[1] ];
});
return _.zipObject(links);
}
// reposAll :: Search -> Options -> Observable Data
// using concatMap
// function reposAll(search, options) {
// return repos(search, options).concatMap(
// function (data) {
// var current = Rx.Observable.return(data);
// var links = utils.parseLink(data.header.link);
// if (links.next) {
// return current.concat(reposAll(search, nextPage(options)));
// } else {
// return current;
// }
// }
// );
// }
// reposAll :: Search -> Options -> Observable Data
// using expand
// function reposAll(search, options) {
// return repos(search, options).expand(
// function (data) {
// var links = utils.parseLink(data.header.link);
// if (links.next) {
// return repos(search, nextPage(options));
// } else {
// return Rx.Observable.empty();
// }
// }
// );
// }
// reposAll :: Search -> Options -> Observable Data
function reposAll(search, options) {
return Rx.Observable.create(function (observer) {
var disposed = false;
function onNext(items) {
//var i = 0, len = items.length;
// for (; i < len && !disposed; i++)
// observer.onNext(items[i]);
if (!disposed)
observer.onNext(items);
}
function onError(err) {
if (!disposed)
observer.onError(err);
}
function onCompleted() {
if (!disposed)
observer.onCompleted();
}
var callback = function (err, body, header) {
if (err) {
onError(err);
} else {
onNext({body: body, header: header});
var links = utils.parseLink(header.link);
if (links.next && !disposed) {
search.repos(nextPage(options), callback);
} else {
onCompleted();
}
}
};
search.repos(options, callback);
return function () {
disposed = true;
};
});
}
var client = github.client();
var search = client.search();
var source = reposAll(search, {
q: 'android in:name,description,readme created:2015-01-01..2015-01-31',
per_page: 100
})
// we can flatten the results as an Observable of Repo
.concatMap(function(data) {
return Rx.Observable.from(data.body.items);
});
source.take(100).subscribe(function onNext(repo) {
console.log(repo.full_name);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment