Skip to content

Instantly share code, notes, and snippets.

@trxcllnt
Created September 22, 2018 07:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trxcllnt/27f9557f7536a1240b4b9fbd62d726a7 to your computer and use it in GitHub Desktop.
Save trxcllnt/27f9557f7536a1240b4b9fbd62d726a7 to your computer and use it in GitHub Desktop.
```const readFile = (...xs) => require('fs').createReadStream(require('path').resolve(...xs));
const tableMerge = require('child_process').spawn('python', ['-c', pymerge()], {
stdio: ['pipe', 'inherit', 'inherit', 'pipe', 'pipe']
});
readFile('./table-1.arrow').pipe(tableMerge.stdio[3]);
readFile('./table-2.arrow').pipe(tableMerge.stdio[4]);
function pymerge() {
return (
`
import os
import sys
import pandas as pd
import pyarrow as pa
table1 = pa.RecordBatchStreamReader(os.fdopen(3, "rb")).read_all()
table2 = pa.RecordBatchStreamReader(os.fdopen(4, "rb")).read_all()
df1 = table1.to_pandas(use_threads=True).set_index(keys='nodeIds',drop=False)
df2 = table2.to_pandas(use_threads=True).set_index(keys='nodeIds',drop=False)
df3 = df2.combine_first(df1).reset_index(drop=True)
table3 = pa.Table.from_pandas(df3, table2.schema)
table3 = table3.remove_column(table3.schema.get_field_index('__index_level_0__'))
pa.RecordBatchStreamWriter(sys.stdout.buffer, table3.schema).write_table(table3)
`);
}```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment