Skip to content

Instantly share code, notes, and snippets.

@guumaster
Last active May 26, 2018 18:16
Show Gist options
  • Save guumaster/072d7abdbb1acf5b1f9b32b78161e156 to your computer and use it in GitHub Desktop.
Save guumaster/072d7abdbb1acf5b1f9b32b78161e156 to your computer and use it in GitHub Desktop.
Introduccion a RxJS

Introducción a Reactive Programming con RxJS

En este articulo vamos a revisar algunos conceptos básicos de programación reactiva y como podemos sacar provecho de ellos en nuestros programas. Pero antes de empezar, preguntaros lo siguiente:

-- ¿Que diferencia hay entre una consulta a base de datos y una serie de clicks de ratón?

-- Ninguna, ambas son colecciones de datos

¿Qué es Reactive Programming?

La Programación Reactiva (o Reactive Programming) es programar con flujos de datos asíncronos. Estos flujos se pueden observar y reaccionar consecuencia. Este no es un concepto nuevo, por ejemplo los clicks del ratón son un flujo de datos asíncrono de uso cotidiano sobre el que, como programadores, trabajamos regularmente.

La Programación Reactiva es esta idea llevada al extremo. Ademas de los eventos de ratón, es posible crear streams o flujos de datos de cualquier cosa, variables, entradas de usuario, consultas de base de datos, estructuras de datos, lecturas de ficheros, llamadas a API remotas, etc.

En definitiva, todo puede ser un stream.

¿Qué es RxJS?

Reactive Extension for JavaScript (RxJS) es una librería que nos permite trabajar con flujos de datos asíncronos. Esta librería nos permite crear programas mediante la composición de flujos de datos asíncronos o basados en eventos.

Utilizando RxJS se puede representar flujos de datos asíncronos mediante Observables y manipular estos flujos con diferentes operadores como si fueran simples colecciones de datos.

Suficiente teoría, veamos algunos ejemplos prácticos. Aunque los ejemplos están en JavaScript, Rx existe para multitud de lenguajes: Java, JavaScript, C#,C#(Unity), Scala, Clojure, C++, Python, Swift, PHP.

Ejemplos básico: Reloj analógico

Con RxJS es posible representar una colección de datos en el tiempo. Una forma rápida de demostrarlo es con un reloj. En este ejemplo utilizaremos un contador de segundos para representar la hora actual.

Reloj analógico con CSS y RxJS Puedes verlo aquí

const timer$ = Rx.Observable.interval(1000) // creamos un stream contador cada 1 segundo

(nota: el sufijo $ en la variable es una convención de nomenclatura para indicar que es un stream de datos)

Este stream emitirá un nuevo dato o evento cada un segundo. Ahora veamos como reaccionar a este flujo

timer$.subscribe(
  data => console.log(data) // mostrará por consola cada segundo: 1, 2, 3, 4... 
  err => console.error(err)
)

Cada stream de RxJS provee una función subscribe() con la que podemos reaccionar a sus datos. Por tanto para nuestro reloj debemos convertir el contador de segundos a una hora. Para ello utilizamos el operador map()

timer$
  .map(() => new Date()) // cada evento de timer$ se convierte a un objeto Date
  .subscribe(fecha => {
     console.log(fecha) 
  })

Ahora tenemos un stream que nos da la hora. Pero como queremos que nuestro reloj sea analógico, tenemos que convertir este dato en los ángulos de las agujas

const getHandAngles = t => {
  const sec = t.getUTCSeconds()
  const min = t.getUTCMinutes()
  const hrs = t.getUTCHours() + TIME_OFFSET
  return {
    hrs: ((hrs * 30) + (min / 2) ) % 360,
    min: (min * 6) % 360,
    sec: (sec * 6) % 360
  }
}

timer$
  .map(() => new Date())
  .map(getHandAngles)
  .subscribe(angles => {
     console.log(angles) // mostrará algo así { hrs: 332.5, min: 30,  sec: 318 }
  })

El ultimo paso es rotar las agujas con un poco de CSS.

const updateHand = ({ selector, angle }) => {
  document.querySelector(selector).style.transform = `rotateZ(${angle}deg)`
  document.querySelector(selector).style.webkitTransform  = `rotateZ(${angle}deg)`
}

timer$
  .map(() => new Date())
  .map(getHandAngles)
  .subscribe(angles=> {
    updateHand({ selector: '.hours-container', angle: angles.hrs })
    updateHand({ selector: '.minutes-container', angle: angles.min })
    updateHand({ selector: '.seconds-container', angle: angles.sec })
  })

Esto es todo. Fijaros la facilidad de lectura del código final. Es una secuencia de pasos que nos permite razonar sobre el stream de forma natural gracias a los operadores, como map() en este caso. En aplicaciones reales mas grandes esta simplicidad es siempre un punto importante tanto a la hora de desarrollar como de mantener código de terceros.

Ver código fuente

Contando palabras

Hemos dicho que un stream es una secuencia de datos a la que podemos reaccionar. En RxJS esto se consigue asociando un generador de eventos a una o mas reacciones.

Contador de palabras con RxJS Puedes ver este ejemplo contando oraciones, palabras, lineas y caracteres aquí

En este ejemplo tenemos un area de texto (stream generador de eventos) y queremos mostrar la cantidad de palabras escritas (reacción). La forma de representar este flujo con RxJS es la siguiente:

const box = document.querySelector('textarea')
const content$ = Rx.Observable
	.fromEvent(box, 'keyup')         // cada `keyup` emite un evento en el stream
    .map(e => e.target.value.trim()) // emite solo el contenido del textarea

(nota: el sufijo $ en la variable es una convención de nomenclatura para indicar que es un stream de datos)

Este stream llamado content$ emitirá un nuevo evento cada vez que el usuario introduzca una nueva letra. La forma de reaccionar a este flujo constante de datos es mediante la función subscribe() de esta forma:

content$
  .subscribe(event => {
    console.log(`Contenido del textarea: ${event.target.value}`) 
  })

Pero esto no es todo lo que buscamos. Para obtener la cantidad de palabras, antes tenemos que transformar el stream y actualizar la UI para mostrar cuantas palabras hay escritas. Para ello utilizamos la función map() que es uno de los tantos operadores que provee RxJS.

content$
  .map(content => content.split(/\s+/))     // pasa la cadena a array de palabras
  .map(wordList => wordList.filter(x => x)) // filtra las cadenas vacias
  .map(wordList => wordList.length)         // transforma el array en un número
  .subscribe(totalWords => {
     document.getElementById('wordsLabel').innerHTML = totalWords
  })

Aquí hemos manipulado el stream en tres pasos. En cada paso se transforma el flujo y emite un objeto modificado hacia el siguiente paso. Finalmente en el método subscribe() es donde indicamos cual es la reacción deseada, en este ejemplo, la reacción es actualizar la UI para mostrar la cantidad de palabras.

Ver código fuente

Busca ciudades

En este ultimo ejemplo nos acercamos un poco mas a una aplicación real. Crearemos un mapa que mostrará el nombre de la ciudad, pais y bandera, asi como los datos del clima del centro del mapa.

Busca ciudades con Google Maps y RxJS Puedes probarlo aquí

Veremos como utilizar RxJS para tratar entrada de usuario, eventos del ratón y peticiones asíncronas a servicios externos. Este ejemplo es un poco mas largo en cuanto a lineas de código, así que nos centraremos en los puntos claves.

Primero crearemos los dos streams de base para toda la aplicación. Uno es la entrada en el campo de texto, y la otra es el movimiento de arrastre del mapa.

const $search = document.getElementById('search')

// input$ :: InputEvent -> String
const input$ = Rx.Observable
  .fromEvent($search, 'keyup')              
  .map(e => e.target.value.trim())          // tomamos el valor y lo limpiamos
  .debounceTime(500)                        // 1/2 segundo entre cada evento
  .filter(term => term && term.length > 2)  // solo si el largo es mayor a 2
  .distinctUntilChanged()                   // solo emite valores no repetidos

Aquí tomamos cada cambio del campo de texto, como máximo un evento cada medio segundo, y lo transformamos hasta obtener y la entrada del usuario hasta obtener una cadena limpia, y mayor de dos caracteres. Ademas, nos aseguramos que el stream no emitirá valores repetidos gracias al operador distinctUntilChanged().

El segundo stream debe representar cada movimiento del mapa. Necesitaremos utilizar los eventos que provee el API de Google Maps., y luego de leer la documentación de los eventos, seleccionamos dos, center_changed y idle, para crear un stream combinado.

let MAP

// creamos dos nuevos stream
const mapCenterChanged$ = new Rx.Subject()
const mapIdle$ = new Rx.Subject()

google.maps.event.addDomListener(window, 'load', () => {
  MAP = new google.maps.Map(document.getElementById('map-canvas'), { zoom: 9 })

  // emitimos cada evento 'idle' al stream mapIdle$
  MAP.addListener('idle', () => mapIdle$.next())
  
  // emitimos las coordenadas del centro del mapa 
  // en cada evento 'center_changed' al stream mapCenterChanged$
  MAP.addListener('center_changed', () =>  mapCenterChanged$.next(MAP.getCenter()))
  

})

Hemos creado dos stream vacíos que son asociados con eventos del mapa. Es decir, cada evento que genera el mapa, se emite por nuestros streams. ¿Porque dos streams y no solo uno? el evento center_changed se emite durante todo el proceso de arrastre, y para nuestro ejemplo, solo nos interesa el punto de destino. Por eso también tenemos un stream para el evento idle. Veamos como combinarlos.

const mapCenter$ = mapIdle$
	.withLatestFrom(mapCenterChanged$, (idle, center) => center) 
	.distinctUntilChanged()

Hemos combinado en uno los streams mapIdle$ y mapCenterChanged$ con el operador withLatestFrom(). Este operador nos permite controlar que emitir en el stream combinado y seleccionamos las coordenadas del centro. Ahora el stream mapCenter$ emite las coordenadas del centro del mapa solo cuando el API nos informa que vuelve a estar idle.

Ahora veamos las dos funciones asíncronas que utilizaremos mas adelante para asociar a ciertos eventos. Estas funciones llaman a diferentes APIs para obtener la información que queremos mostrar en nuestra app.

const OPENWEATHER_API_KEY = 'SOME_KEY_HERE'
const OPENWEATHER_BASE_URL = `http://api.openweathermap.org/data/2.5/weather?units=metric&lang=es&appid=${OPENWEATHER_API_KEY}`

const GEOCODER = new google.maps.Geocoder()

const FETCH_OPTIONS = {
  cache: "default",
  headers: new Headers(),
  method: "GET",
  mode: "cors"
}

// OpenWeather API
// fetchWeather :: {lat,lng} -> Promise<WeatherInfo>
const fetchWeather = ({ lat, lng }) => {
	const url = `${OPENWEATHER_BASE_URL}&lat=${lat}&lon=${lng}`
	return fetch(url, FETCH_OPTIONS)
	.then(raw => raw.json())
}

// addressSearch :: String -> Promise<GeocodeResult>
const addressSearch = (address) => {
  return new Promise((resolve, reject) => {
    GEOCODER.geocode({ address }, (results, status) => {
      if (status === google.maps.GeocoderStatus.OK) {
        return resolve(results[0])
      }
      resolve(status)
    })
  })
}

Sin mucho detalle, fetchWeather() nos da información del clima a partir de unas coordenadas geográficas, y addresSearch() hace una búsqueda de dirección a partir una cadena de texto. Podéis ver mas información en la documentación de OpenWeatherMap API y Google Maps Geocoding API.

Muy bien, ya tenemos los componentes básicos de nuestra aplicación. Ahora veamos como combinarlos. Veamos primero como reaccionar a la entrada de texto del usuario. Para ello vamos a asociar el stream input$ con la función addressSearch().

// search$ :: UserInput<string> -> Promise<GeocoderResult>
const search$ = input$.switchMap(addressSearch)

Con esta simple linea, obtenemos una búsqueda de dirección cada vez que nuestro stream input$ nos informa un cambio en el campo de texto. En lugar del operador map() utilizamos el operador switchMap() ya que este se encarga de cancelar cualquier evento previo que aun no haya acabado, por ejemplo, supongamos que se dispara un evento cuando el usuario introduce "Alcalá", medio segundo después otro cuando introduce "Alcalá de", y otro cuando acaba de introducir "Alcalá de Henares". Con map() tendríamos tres búsquedas que serían emitidas al stream, en cambio con switchMap() nos aseguramos que solo se emitirá la ultima búsqueda y las que no hayan acabado serán canceladas. ¡Y todo esto con solo una linea! ¿Cuanto llevaría esto sin RxJS?

Ahora transformemos aun mas el resultado de la búsqueda en los datos que necesitamos.

// validCity$ :: GeocoderResult -> City<name, code, country, location>
const validCity$ = search$
	.map(cityFromGeoResult)  // extrae datos del resultado, omitida por simplicidad
	.filter(validCity)       // filtra los resultados vacíos, omitida por simplicidad 

// location$ ::  City<name, code, country, location> -> location
const location$ = validCity$.map(city => city.location)

// location$ ::  location -> Promise<WeatherInfo>
const weather$ = location$
  .switchMap(location => {
    return fetchWeather({ lat:  location.lat(), lng: location.lng() })
  })
  
// moves$ :: Location<lat,lng> -> [Promise<City>, Promise<WeatherInfo>]
const moves$ = mapCenter$
    .do(showLoading) // side effect
    .switchMap(center => {
      // combinamos en una Promise ambas búsquedas, de dirección y clima 
      return Promise.all([
        addressSearch(`${center.lat()},${center.lng()}`)
	        .then(cityFromGeoResult),
        fetchWeather({ lat: center.lat(), lng: center.lng() })
      ])
    })

Tenemos cuatro nuevos streams, uno de ciudades validCity$ que emite el resultado de búsqueda en un formato mas adecuado a nuestra app; location$ que emite las coordenadas; weather$ que busca el clima en unas coordenadas geográficas; y moves$ que genera una búsqueda al final de cada movimiento manual del mapa.

Leer cada parte del código, y fijaros que natural resulta leer la combinación y manipulación de los diferentes streams.

Una particularidad de RxJS es que nada sucede en un stream que nadie este observando, es decir, hasta que no suscribimos una reacción a un stream, no se ejecuta ningún paso del stream, aunque haya eventos emitiéndose. Asi que vamos a reaccionar a todos los streams que hemos creado.

// se omiten las funciones para actualizar el DOM por simplicidad

// UI Loading indicator
input$.subscribe(showLoading, onError)
search$.subscribe(hideLoading, onError)

// UI country, city and flag
validCity$.subscribe(updateLabel, onError)

// UI weather info
weather$.subscribe(updateWeather, onError)

// UI Map update
location$.subscribe(panToLocation, onError)

moves$
  .subscribe(res => {
    updateLabel(res[0])
    updateWeather(res[1])
  }) 

Muy bien, ya tenemos nuestra app en marcha. A pesar que hemos omitido las funciones del DOM, las podéis deducir fácilmente por el nombre. Lo importante es ver como podemos reaccionar a multiples streams modeladas según las necesidades de cada aplicación.

Ver código fuente

Conclusión

Hemos visto como utilizar RxJS para modelar colecciones variables en el tiempo, desde entradas de usuario, búsquedas asíncronas y eventos de APIs externas. Este repaso con ejemplos no es siquiera la superficie de las posibilidades que nos da esta librería.

Referencias

Documentación
Código fuente
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment