Skip to content

Instantly share code, notes, and snippets.

private fun pullRequests(view: T): ConnectableObservable<Pair<List<PullRequest>, String>> {
return Observable.combineLatest(
view.intentPullToRefresh().startWith { Object() },
connectionProvider.connections(),
BiFunction<Any, BitbucketConnection, BitbucketConnection> { _, conn -> conn }
).switchMap { (_, serverUrl, api, token) ->
repositoriesStorage.selectedRepositories()
.switchMap { list ->
Observable.fromIterable(list)
.flatMap { (slug, _, project) ->
private fun calculateTeamMembership(): Observable<Timed<Map<User, Density>>> {
return Observable.combineLatest(
connectionProvider.connections(),
repositoriesStorage.selectedRepositories(),
BiFunction<BitbucketConnection, List<Repository>, Observable<Timed<Map<User, Density>>>> {
(userName, _, api, token), repositories ->
return@BiFunction Observable.fromIterable(repositories)
.flatMap { (slug, _, project) ->
BitbucketApi.queryPaged { start ->
api.getPullRequests(token, project.key, slug, start,
fun currentNetwork(context: Context,
scheduler: Scheduler = AndroidSchedulers.mainThread())
: Flowable<Network> {
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
return Flowable.create<Network>({ emitter ->
val worker = scheduler.createWorker()
val emit = { network: Network ->
worker.schedule { emitter.onNext(network) }
currentNetwork(this)
.subscribe {
Log.i("TMPTAG", "Current network: $it on thread: ${Thread.currentThread().name}")
}
fun currentNetwork(context: Context): Flowable<Network> {
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
return Flowable.create<Network>({ emitter ->
emitter.onNext(Network.NONE)
val request = NetworkRequest.Builder()
.addTransportType(NetworkCapabilities.TRANSPORT_CELLULAR)
.build()
val networkCallback = object: ConnectivityManager.NetworkCallback() {
enum class Network {
NONE,
CELLULAR
}
static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
sharedPreferenceValues(preferences, "key")
.subscribe { Log.i("TMPTAG", "Obtained a new preferences value: $it") }
private fun sharedPreferenceValues(preferences: SharedPreferences, key: String): Flowable<String> =
Flowable.create({ emitter ->
val emitCurrentKey = { emitter.onNext(preferences.getString(key, "")) }
val preferencesListener = SharedPreferences.OnSharedPreferenceChangeListener { _, updatedKey ->
if (TextUtils.equals(key, updatedKey)) {
emitCurrentKey()
}
}
private fun contacts(): Flowable<SimpleContact> =
Flowable.generate<SimpleContact, Cursor>(
Callable<Cursor> {
contentResolver.query(Contacts.CONTENT_URI,
arrayOf(Contacts.DISPLAY_NAME, Contacts.STARRED),
null, null,
"${Contacts.DISPLAY_NAME} ASC")
},
BiFunction<Cursor, Emitter<SimpleContact>, Cursor> { cursor, emitter ->