Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created August 26, 2018 20:44
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anaisbetts/7ac04d83790a09af2cf58dd6d1984b8c to your computer and use it in GitHub Desktop.
Save anaisbetts/7ac04d83790a09af2cf58dd6d1984b8c to your computer and use it in GitHub Desktop.
RxJS + Home Assistant
import { create } from './websocket-ha';
const homeAssistant = create('http://localhost:8123');
homeAssistant.connect();
await homeAssistant.auth("my secret password");
// Make API calls
console.log(await homeAssistant.call({type: 'get_config'}));
// Listen to Server Events
homeAssistant.listen('state_changed').subscribe(e => console.log(e));
import * as IsomorphicWebSocket from 'isomorphic-ws';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { empty, Observable, Observer, of, throwError, Subscription } from 'rxjs';
import { filter, flatMap, take } from 'rxjs/operators';
// tslint:disable-next-line:no-var-requires
const d = require('debug')('ha-lithium:websocket');
interface WebSocketExtensions {
auth(password: string): Promise<boolean>;
connect(): Subscription;
call(content: any): Promise<any>;
listen(eventType?: string): Observable<any>;
}
export type HomeAssistantSocket = WebSocketSubject<any> & WebSocketExtensions;
type HomeAssistantSocketPriv = HomeAssistantSocket & { sequence: number };
const WebSocketMixins: WebSocketExtensions = {
connect: function(this: HomeAssistantSocketPriv): Subscription {
this.sequence = 1;
const ret = this.subscribe();
// Calling unsubscribe() on the top-level connect() should tear it all down
ret.add(this);
return ret;
},
call: function(this: HomeAssistantSocketPriv, content: any) {
const currentSeq = this.sequence;
const promiseRet = this.pipe(
filter((x: any) => x.id === currentSeq),
flatMap((x: any) => {
if (x.success !== true) {
return throwError(`Failed call: ${JSON.stringify(x)}`);
} else {
delete x.id;
return of(x);
}
}),
take(1))
.toPromise();
this.next(content);
return promiseRet;
},
auth: function(this: HomeAssistantSocketPriv, password: string) {
const ret = this.pipe(
filter((x: any) => x.type !== 'auth_required'),
flatMap((x: any) => {
if (x.type === 'auth_ok') { return of(true); }
return throwError(new Error(`Failed to auth: ${JSON.stringify(x)}`));
}),
take(1)
).toPromise();
this.next({type: 'auth', api_password: password});
return ret;
},
listen: function(this: HomeAssistantSocketPriv, eventType?: string): Observable<any> {
return Observable.create((subj: Observer<any>) => {
let opts: any = {
type: 'subscribe_events'
};
const currentSeq = this.sequence;
const disp = new Subscription();
if (eventType) { opts.event_type = eventType; }
d(`Setting up subscriptions for event: ${eventType || 'all'}`);
this.call(opts).then((_: any) => {
disp.add(async () => {
try {
d(`Unsubscribing for event: ${eventType || 'all'}`);
await this.call({type: 'unsubscribe_events', subscription: currentSeq});
} catch (e) {
d(`Failed to unsubscribe`);
d(e.message);
}
});
disp.add(this.pipe(flatMap((x: any) => {
if (x.id !== currentSeq) { return empty(); }
delete x.id;
return of(x);
})).subscribe(subj));
}, (err: Error) => {
disp.unsubscribe();
subj.error(err);
});
return disp;
});
},
};
export function create(host: string): HomeAssistantSocket {
const shutUpTypeScript: any = IsomorphicWebSocket;
const ret: any = webSocket({
url: `${host}/api/websocket`,
serializer: (val: any) => {
if (val.type === 'auth') { return JSON.stringify(val); }
return JSON.stringify({
id: ret.sequence++,
...val,
});
},
WebSocketCtor: shutUpTypeScript
});
return Object.assign(ret, WebSocketMixins);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment