Skip to content

Instantly share code, notes, and snippets.

@hubgit
Last active September 5, 2022 22:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hubgit/a99d6387773164400bb88a2ce7c98f8a to your computer and use it in GitHub Desktop.
Save hubgit/a99d6387773164400bb88a2ce7c98f8a to your computer and use it in GitHub Desktop.
A ReadableStream created from an async iterator which fetches paginated data, piped into a WritableStream which inserts items into an SQLite database.
import { parse } from 'https://deno.land/x/xml@2.0.4/mod.ts'
import { readableStreamFromIterable } from 'https://deno.land/std@0.96.0/io/streams.ts'
import { Database } from 'https://deno.land/x/sqlite3@0.5.2/mod.ts'
import ProgressBar from 'https://deno.land/x/progress@v1.2.7/mod.ts'
let counter = 0
const progress = new ProgressBar({
title: 'processing:',
interval: 100,
display: ':completed/:total :time [:bar] :percent (ETA :eta)',
})
const fetchGenbank = async function* (term: string) {
const url = new URL(
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi'
)
url.searchParams.set('db', 'nucleotide')
url.searchParams.set('retmode', 'json')
url.searchParams.set('retmax', '0')
url.searchParams.set('usehistory', 'y')
url.searchParams.set('term', term)
const response = await fetch(url)
if (!response.ok) {
throw new Error(response.statusText)
}
const { esearchresult } = await response.json()
const retmax = 100
for (let retstart = 0; retstart < esearchresult.count; retstart += retmax) {
const url = new URL(
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi'
)
url.searchParams.set('db', 'nuccore')
url.searchParams.set('retmode', 'xml')
url.searchParams.set('rettype', 'fasta')
url.searchParams.set('retstart', String(retstart))
url.searchParams.set('retmax', String(retmax))
url.searchParams.set('webenv', esearchresult.webenv)
url.searchParams.set('query_key', esearchresult.querykey)
const response = await fetch(url)
if (!response.ok) {
throw new Error(response.statusText)
}
const xml = await response.text()
const { TSeqSet } = parse(xml) as unknown as {
TSeqSet: {
TSeq: {
TSeq_sequence: string
TSeq_accver: string
}[]
}
}
for (const TSeq of TSeqSet.TSeq) {
yield TSeq
progress.render(++counter, { total: esearchresult.count })
}
}
}
const buildAllWords = (sequence: string, length: number) => {
const words: Array<[string, number]> = []
const max = sequence.length - length
for (let pos = 0; pos <= max; pos++) {
words.push([sequence.substring(pos, pos + length), pos])
}
return words
}
class SQLiteStream extends WritableStream {
constructor(private path: string) {
const database = new Database(path)
database.run('PRAGMA synchronous = OFF')
database.run('PRAGMA journal_mode = MEMORY')
database.run('PRAGMA locking_mode = EXCLUSIVE')
database.run('create table `words` (`word`, `accver`, `pos` integer)')
database.run(
'create table `items` (`accver`, `orgname`, `defline`, `seqlength` integer)'
)
const insertWordStatement = database.prepare(
'insert into `words` (`word`, `accver`, `pos`) values (?, ?, ?)'
)
const insertItemStatement = database.prepare(
'insert into `items` (`accver`, `orgname`, `defline`, `seqlength`) values (?, ?, ?, ?)'
)
const insertWords = database.transaction((item) => {
const {
TSeq_accver: accver,
TSeq_orgname: orgname,
TSeq_defline: defline,
TSeq_length: seqlength,
} = item
for (const [word, pos] of item.words) {
insertWordStatement.run([word, accver, pos])
}
insertItemStatement.run([accver, orgname, defline, seqlength])
})
super({
write(item) {
if (item.TSeq_accver) {
item.words = buildAllWords(item.TSeq_sequence, 12)
insertWords(item)
}
},
close() {
insertWordStatement.finalize()
insertItemStatement.finalize()
console.log('creating index…')
database.run('create index `idx_word` on words(`word`)')
database.close()
console.log('done')
},
})
}
}
const term = [
'(Coronaviridae[Organism] NOT Severe acute respiratory syndrome coronavirus 2[Organism])',
'MN908947[Accession]',
].join(' OR ')
try {
await Deno.remove('data/words.db')
} catch {
// didn't exist
}
await readableStreamFromIterable(fetchGenbank(term)).pipeTo(
new SQLiteStream('data/words.db')
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment