Skip to content

Instantly share code, notes, and snippets.

@adsharma
Created November 25, 2024 16:18
Show Gist options
  • Select an option

  • Save adsharma/71bd591ea5d4242ec4e250e7fc7d20d1 to your computer and use it in GitHub Desktop.

Select an option

Save adsharma/71bd591ea5d4242ec4e250e7fc7d20d1 to your computer and use it in GitHub Desktop.
truthy.py
#!/usr/bin/env python3
import re
import sys
import itertools
import duckdb
import io
import pandas as pd
conn = duckdb.connect('truthy.db')
count = 0
def process_chunk(chunk):
"""
Process a chunk of lines.
Modify this function to perform your specific processing logic.
"""
global count
pat = re.compile("<http://www.wikidata.org/entity/(Q[0-9]+)> <http://www.wikidata.org/prop/direct/(P[0-9]+)> <http://www.wikidata.org/entity/(Q[0-9]+)>")
out = []
for line in chunk:
fields = pat.match(line)
if not fields:
continue
out.append("\t".join([f[1:] for f in fields.groups()]))
# Read TSV into pandas DataFrame
df = pd.read_csv(io.StringIO("\n".join(out)), sep='\t')
# Create temporary table from DataFrame
conn.register('temp_table', df)
upsert_query = f"""
INSERT INTO relations
SELECT * FROM temp_table t
"""
conn.execute(upsert_query)
count += 100000
sys.stdout.write(f"{count}\n")
sys.stdout.flush()
def main():
# Process stdin in chunks of 100,000 lines
for chunk in iter(lambda: itertools.islice(sys.stdin, 100000), []):
process_chunk(chunk)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment