Skip to content

Instantly share code, notes, and snippets.

@evantahler
Created April 8, 2022 21:42
Show Gist options
  • Save evantahler/71b22b63963c965cabf860f8a707a9f1 to your computer and use it in GitHub Desktop.
Save evantahler/71b22b63963c965cabf860f8a707a9f1 to your computer and use it in GitHub Desktop.
Airbyte Source in TS
FROM node:alpine
LABEL maintainer="evan@airbyte.io"
WORKDIR /airbyte/integration_code
COPY package*.json ./
COPY . .
RUN npm install
RUN npm run build
ENTRYPOINT ["node", "/airbyte/integration_code/dist/source.js"]
ENV AIRBYTE_ENTRYPOINT="node /airbyte/integration_code/dist/source.js"
LABEL io.airbyte.name=airbyte/source-stock-ticker-api
LABEL io.airbyte.version=0.1.0
{
"name": "source-stock-ticker-api",
"version": "1.0.0",
"description": "This is the repository for the Stock Ticker Api source connector. For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/stock-ticker-api).",
"scripts": {
"build": "tsc",
"watch": "tsc --watch",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"axios": "0.26.1",
"commander": "9.1.0"
},
"devDependencies": {
"@types/node": "17.0.23",
"typescript": "4.6.3"
}
}
#!/usr/bin/env node
import path from "path";
import { Command } from "commander";
import axios from "axios";
import {
readJsonFile,
emit,
dateString,
log,
AirbyteConnectionStatus,
} from "./utils.js";
import { config } from "process";
export type SourceStockTickerConfig = {
stock_ticker: string;
api_key: string;
};
export type SyncMode = "full_refresh" | "incremental";
export type SourceStockTickerConfiguredCatalog = {
streams: Array<{
sync_mode: SyncMode;
stream: {
name: string;
supported_sync_modes: Array<SyncMode>;
json_schema: {
properties: Record<string, Record<string, { type: string }>>;
};
};
}>;
};
class SourceStockTicker {
async spec() {
const specification = readJsonFile(
path.join(__dirname, "..", "data", "spec.json")
);
const airbyteMessage = { type: "SPEC" as const, spec: specification };
emit(airbyteMessage);
}
async discover() {
const catalog = readJsonFile(
path.join(__dirname, "..", "data", "catalog.json")
);
const airbyteMessage = { type: "CATALOG" as const, catalog };
emit(airbyteMessage);
}
async check(args: Record<string, any>) {
let connectionStatus: AirbyteConnectionStatus;
try {
const { api_key, stock_ticker } = this.loadConfig(args);
const response = await this._callApi(api_key, stock_ticker);
if (response.status === 200) {
connectionStatus = { status: "SUCCEEDED" };
} else if (response.status === 403) {
connectionStatus = {
status: "FAILED",
message: "API Key is incorrect.",
};
} else {
connectionStatus = {
status: "FAILED",
message:
"Input configuration is incorrect. Please verify the input stock ticker and API key.",
};
}
} catch (error) {
connectionStatus = {
status: "FAILED",
message: String(error),
};
}
const output = { type: "CONNECTION_STATUS" as const, connectionStatus };
emit(output);
}
async read(args: Record<string, any>) {
const { api_key, stock_ticker } = this.loadConfig(args);
if (!api_key || !stock_ticker) return log(`no api_key in config`, true);
const configuredCatalog = this.loadConfiguredCatalog(args);
const stockPricesStream = configuredCatalog.streams.find(
(s) => s?.stream?.name === "stock_prices"
);
if (!stockPricesStream) {
log("no streams selected");
return;
}
if (stockPricesStream.sync_mode !== "full_refresh") {
return log("only full_refresh supported (for now)", true);
}
try {
const response = await this._callApi(api_key, stock_ticker);
for (const result of response?.data?.results) {
const message = {
type: "RECORD" as const,
record: {
stream: "stock_prices",
data: {
date: dateString(new Date(result.t)),
stock_ticker,
price: result.c as number,
},
emitted_at: Date.now(),
},
};
emit(message);
}
} catch (error) {
log(error, true);
}
}
private async _callApi(apiKey: string, stock_ticker: string) {
const end = dateString();
const start = dateString(new Date(Date.now() - 7 * 24 * 60 * 60 * 1000));
const url = `https://api.polygon.io/v2/aggs/ticker/${stock_ticker}/range/1/day/${start}/${end}?sort=asc&limit=120&apiKey=${apiKey}`;
return axios.get(url, { validateStatus: () => true });
}
private loadConfig(args: { config?: string }) {
if (args.config) {
const config: SourceStockTickerConfig = readJsonFile(args.config);
return config;
} else {
log("config is required", true);
}
}
private loadConfiguredCatalog(args: { catalog?: string }) {
if (args.catalog) {
const catalog: SourceStockTickerConfiguredCatalog = readJsonFile(
args.catalog
);
return catalog;
} else {
log("catalog is required", true);
}
}
}
// The interface for the CLI commands
const program = new Command();
const packageJSON = readJsonFile(path.join(__dirname, "..", "package.json"));
const source = new SourceStockTicker();
program
.name(packageJSON.name)
.description(packageJSON.description)
.version(packageJSON.version);
program
.command("spec")
.description("display the Airbyte spec")
.action(() => source.spec());
program
.command("discover")
.description("display the Airbyte catalog")
.option("-c, --config <file>", "path to config file")
.action(() => source.discover());
program
.command("check")
.description("check the Airbyte against a config")
.option("-c, --config [file]", "path to config file")
.action((args) => source.check(args));
program
.command("read")
.description("get the data")
.option("-c, --config [file]", "path to config file")
.option("-a, --catalog [file]", "path to catalog file")
.action((args) => source.read(args));
program.parse();
import fs from "fs";
export type AirbyteMessageType =
| "LOG"
| "SPEC"
| "CATALOG"
| "CONNECTION_STATUS"
| "RECORD";
export type AirbyteConnectionStatus = {
status: string;
message?: string;
catalog?: string;
spec?: Record<string, any>;
};
export type AirbyteMessage = {
type: AirbyteMessageType;
log?: string;
connectionStatus?: AirbyteConnectionStatus;
record?: Record<string, any>;
};
export const readJsonFile = (filename: string) => {
try {
const body = fs.readFileSync(filename).toString();
return JSON.parse(body);
} catch (error) {
log(error, true);
}
};
export const emit = (line: AirbyteMessage) => {
console.log(JSON.stringify(line));
};
export const log = (message: string, fatal = false) => {
emit({ type: "LOG", log: message });
if (fatal) process.exit(1);
};
export const dateString = (d = new Date()) => {
return d.toISOString().slice(0, 10);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment