Skip to content

Instantly share code, notes, and snippets.

@netshade

netshade/csv.cpp Secret

Last active January 24, 2021 02:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save netshade/aa9e836e843c8e84b97a to your computer and use it in GitHub Desktop.
Save netshade/aa9e836e843c8e84b97a to your computer and use it in GitHub Desktop.
Silly CSV thingy
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include <vector>
#include <x86intrin.h>
#include <emmintrin.h>
using namespace std;
typedef struct {
int header;
const char * buffer;
int bufferLen;
} bufferString;
typedef struct {
FILE * destination;
char * lineBuffer;
size_t lineBufferSize;
char * outputBuffer;
size_t outputBufferSize;
size_t outputBufferHead;
} flush_settings;
inline void onWrite(size_t lineSize, flush_settings * flushConfig, bool forceOut){
size_t desiredOut = flushConfig->outputBufferHead + lineSize;
if(forceOut || desiredOut >= flushConfig->outputBufferSize){
// flush to out
int written = fwrite(flushConfig->outputBuffer, sizeof(char), flushConfig->outputBufferHead, flushConfig->destination);
if(written != flushConfig->outputBufferHead){
std::cerr << "Failed to write to destination, expected " << flushConfig->outputBufferHead << " got " << written << ", error " << errno << std::endl;
assert(written == flushConfig->outputBufferHead);
}
flushConfig->outputBufferHead = 0;
}
// The head plus line size must be less than total output buffer size, or the attempted line write exceeds output buffer
assert(flushConfig->outputBufferHead + lineSize < flushConfig->outputBufferSize);
char * copyDst = &flushConfig->outputBuffer[flushConfig->outputBufferHead];
// The pointer returned should be the intended copy destination
assert(memmove(copyDst, flushConfig->lineBuffer, lineSize) == copyDst);
flushConfig->outputBufferHead += lineSize;
}
inline void onRow(const vector<const string *> * headers,
size_t headerSize,
const bufferString * const row,
flush_settings * flushConfig,
const char delim,
const char lineDelim){
int pos = 0;
int i = 1;
size_t lineSize = flushConfig->lineBufferSize;
char * copyDst = NULL;
// If the position + buffer exceeds line buffer, we cannot proceed
assert(pos + row[0].bufferLen < lineSize);
copyDst = &flushConfig->lineBuffer[pos];
// The pointer returned should be the intended copy destination
assert(memmove(copyDst, row[0].buffer, row[0].bufferLen) == copyDst);
pos += row[0].bufferLen;
for(; i < headerSize; i++){
// Advancing the write should be less than the line buffer size
assert(pos + 1 < lineSize);
flushConfig->lineBuffer[pos] = '{';
pos ++;
// Advancing the write should be less than the line buffer size
assert(pos + row[i].bufferLen < lineSize);
copyDst = &flushConfig->lineBuffer[pos];
// The pointer returned should be the intended copy destination
assert(memmove(copyDst, row[i].buffer, row[i].bufferLen) == copyDst);
pos += row[i].bufferLen;
}
// Advancing the write should be less than the line buffer size
assert(pos + 1 < lineSize);
flushConfig->lineBuffer[pos] = '\n';
pos += 1;
onWrite(pos, flushConfig, false);
}
inline void onHeaders(const vector<const string *> * const headers,
size_t headerSize,
flush_settings * flushConfig,
const char delim,
const char lineDelim){
int pos = 0;
int i = 1;
size_t lineSize = flushConfig->lineBufferSize;
const string * header = headers->at(0);
const char * headerCstr = header->c_str();
int headerLen = header->length();
char * copyDst = NULL;
// Advancing the write should be less than the line buffer size
assert(pos + headerLen < lineSize);
copyDst = &flushConfig->lineBuffer[pos];
// The pointer returned should be the intended copy destination
assert(memmove(copyDst, headerCstr, headerLen) == copyDst);
pos += headerLen;
for(; i < headerSize; i++){
// Advancing the write should be less than the line buffer size
assert(pos + 1 < lineSize);
flushConfig->lineBuffer[pos] = '{';
pos ++;
header = headers->at(i);
headerCstr = header->c_str();
headerLen = header->length();
// Advancing the write should be less than the line buffer size
assert(pos + headerLen < lineSize);
copyDst = &flushConfig->lineBuffer[pos];
// The pointer returned should be the intended copy destination
assert(memmove(copyDst, headerCstr, headerLen) == copyDst);
pos += headerLen;
}
// Advancing the write should be less than the line buffer size
assert(pos + 1 < lineSize);
flushConfig->lineBuffer[pos] = '\n';
pos += 1;
onWrite(pos, flushConfig, false);
}
#ifdef __SSE4_2__
static __inline__ __m128i __m128i_shift_right (__m128i value, int offset) {
switch (offset) {
case 1:
value = _mm_srli_si128 (value, 1);
break;
case 2:
value = _mm_srli_si128 (value, 2);
break;
case 3:
value = _mm_srli_si128 (value, 3);
break;
case 4:
value = _mm_srli_si128 (value, 4);
break;
case 5:
value = _mm_srli_si128 (value, 5);
break;
case 6:
value = _mm_srli_si128 (value, 6);
break;
case 7:
value = _mm_srli_si128 (value, 7);
break;
case 8:
value = _mm_srli_si128 (value, 8);
break;
case 9:
value = _mm_srli_si128 (value, 9);
break;
case 10:
value = _mm_srli_si128 (value, 10);
break;
case 11:
value = _mm_srli_si128 (value, 11);
break;
case 12:
value = _mm_srli_si128 (value, 12);
break;
case 13:
value = _mm_srli_si128 (value, 13);
break;
case 14:
value = _mm_srli_si128 (value, 14);
break;
case 15:
value = _mm_srli_si128 (value, 15);
break;
}
return value;
}
static inline __m128i __m128i_strloadu (const unsigned char * p) {
int offset = ((size_t) p & (16 - 1));
if (offset && (int) ((size_t) p & 0xfff) > 0xff0) {
__m128i a = _mm_load_si128 ((__m128i *) (p - offset));
__m128i zero = _mm_setzero_si128 ();
int bmsk = _mm_movemask_epi8 (_mm_cmpeq_epi8 (a, zero));
if ((bmsk >> offset) != 0) {
return __m128i_shift_right(a, offset);
}
}
return _mm_loadu_si128 ((__m128i *) p);
}
#endif
size_t parseBuffer(vector<const string *> * headers, // List of headers, will be populated by this function
const char * buffer, // Current input buffer to parse
const int bufferSize, // Size of avail chars in buffer
bufferString ** rowBuffer, // Memory buffer to place rows in, filled by this function, function allocates, caller frees
size_t * rowBufferSize, // Size of buffer, filled by this function
flush_settings * flushConfig, // Output buffering config, used by onWrite
const char delim, // Delimiter of values
const char lineDelim, // How lines are separated
size_t * numRows) // How many rows a call to this function placed
{
int bufferPos = 0;
int tokenStartsAt = 0;
int nextReadStartsAt = 0;
int lastLineAt = 0;
int headerSelect = 0;
int i = 0;
size_t rows = 0;
size_t headerSize = headers->size();
bool fillHeaders = headers->empty();
bool inQuote = false;
char lastQuote = '\0';
char c;
while(true){
if(buffer[bufferPos] == lineDelim){
bufferPos ++;
} else {
if(c != '\r'){
break;
} else {
bufferPos ++;
}
}
}
#ifdef __SSE4_2__
unsigned char compareStr[7] = { '\\', '\r', '\'', '"', '\0', '\0', '\0' };
compareStr[4] = delim;
compareStr[5] = lineDelim;
__m128i compareReg = __m128i_strloadu((unsigned char *)compareStr);
__m128i candidateReg;
const int compareFlag = _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_ANY | _SIDD_LEAST_SIGNIFICANT;
#endif
while(bufferPos < bufferSize){
#ifdef __SSE4_2__
if((bufferPos + 16) < bufferSize){
while( (bufferPos + 16) < bufferSize){
candidateReg = __m128i_strloadu((unsigned char *)&buffer[bufferPos]);
unsigned int cmp = _mm_cmpistri(compareReg, candidateReg, compareFlag);
if( cmp == 16){
bufferPos += 16;
} else {
bufferPos += cmp;
break;
}
}
}
#endif
c = buffer[bufferPos];
switch(c){
case '\\':
break;
case '\r':
tokenStartsAt = bufferPos + 1;
nextReadStartsAt = bufferPos + 1;
break;
case '\'':
case '"':
if(inQuote){
inQuote = c != lastQuote;
} else {
inQuote = true;
lastQuote = c;
}
break;
default:
if(!inQuote && (c == lineDelim || c == delim)){
if(fillHeaders){
const string * value = new string(&buffer[tokenStartsAt], bufferPos - tokenStartsAt);
headers->push_back(value);
} else {
// We should only be on a header within the valid set of headers
assert(headerSelect < headerSize);
bufferString * const row = &((*rowBuffer)[headerSelect]);
row->buffer = &buffer[tokenStartsAt];
row->bufferLen = bufferPos - tokenStartsAt;
row->header = headerSelect;
headerSelect ++;
}
tokenStartsAt = bufferPos + 1;
if(c == lineDelim){
if(fillHeaders){
headerSize = headers->size();
onHeaders(headers, headerSize, flushConfig, delim, lineDelim);
fillHeaders = false;
*rowBufferSize = sizeof(bufferString) * headerSize;
// malloc must succeed
assert(posix_memalign((void **)rowBuffer, 16, *rowBufferSize) == 0);
} else {
// headers seen in row must equal headers seen in header row
assert(headerSelect == headerSize);
onRow(headers, headerSize, *rowBuffer, flushConfig, delim, lineDelim);
// REMOVED, BUT PROBABLY UNSAFE:
// bzero(*rowBuffer, *rowBufferSize);
headerSelect = 0;
rows ++;
}
nextReadStartsAt = tokenStartsAt;
lastLineAt = nextReadStartsAt;
}
}
break;
}
bufferPos ++;
}
*numRows = rows;
return nextReadStartsAt;
}
typedef enum {
ASCII = 0,
UTF8 = 1,
UTF16BigEndian = 2,
UTF16LittleEndian,
UTF32BigEndian,
UTF32LittleEndian,
UTF7,
UTF1,
UTFEBCDIC,
SCSU,
BOCU,
GB18030
} csv_encoding;
// BOM encodings taken from https://en.wikipedia.org/wiki/Byte_order_mark
const char * utf8BOM = "\xEF\xBB\xBF";
const char * utf16BEBOM = "\xFE\xFF";
const char * utf16LEBOM = "\xFF\xFE";
const char * utf32BEBOM = "\x00\x00\xFE\xFF";
const char * utf32LEBOM = "\xFF\xFE\x00\x00";
const char * utf71BOM = "\x2B\x2F\x76\x38";
const char * utf72BOM = "\x2B\x2F\x76\x39";
const char * utf73BOM = "\x2B\x2F\x76\x2B";
const char * utf74BOM = "\x2B\x2F\x76\x2F";
const char * utf75BOM = "\x2B\x2F\x76\x38\x2D";
const char * utf1BOM = "\xF7\x64\x4C";
const char * utfEBCDICBOM = "\xDD\x73\x66\x73";
const char * SCSUBOM = "\x0E\xFE\xFF";
const char * BOCUBOM = "\xFB\xEE\x28";
const char * GB18030BOM = "\x84\x31\x95\x33";
int main(int argc, char ** argv){
FILE * csv = NULL;
if(argc == 1){
csv = stdin;
}
if(argc == 2){
csv = fopen(argv[1], "r");
}
if(csv == NULL){
std::cerr << "Error opening file or no file provided: " << errno << std::endl;
exit(-1);
}
size_t maxBOMSize = 5;
const char * outputBOM = "";
char BOM[maxBOMSize];
bzero(BOM, maxBOMSize);
size_t readBOMBytes = fread(BOM, sizeof(char), maxBOMSize, csv);
if(readBOMBytes == 0){
std::cerr << "Error checking for BOM: " << errno << std::endl;
}
csv_encoding encoding = ASCII;
if(readBOMBytes >= strlen(utf8BOM) && strncmp(utf8BOM, BOM, strlen(utf8BOM)) == 0){
encoding = UTF8;
outputBOM = utf8BOM;
fseek(csv, strlen(utf8BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf16BEBOM) && strncmp(utf16BEBOM, BOM, strlen(utf16BEBOM)) == 0){
encoding = UTF16BigEndian;
outputBOM = utf16BEBOM;
fseek(csv, strlen(utf16BEBOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf16LEBOM) && strncmp(utf16LEBOM, BOM, strlen(utf16LEBOM)) == 0){
encoding = UTF16LittleEndian;
outputBOM = utf16LEBOM;
fseek(csv, strlen(utf16LEBOM), SEEK_SET);
} else if(readBOMBytes >= 4 && BOM[0] == utf32BEBOM[0] && BOM[1] == utf32BEBOM[1] && BOM[2] == utf32BEBOM[2] && BOM[3] == utf32BEBOM[3]){
encoding = UTF32BigEndian;
outputBOM = utf32BEBOM;
fseek(csv, 4, SEEK_SET);
} else if(readBOMBytes >= 4 && BOM[0] == utf32LEBOM[0] && BOM[1] == utf32LEBOM[1] && BOM[2] == utf32LEBOM[2] && BOM[3] == utf32LEBOM[3]){
encoding = UTF32LittleEndian;
outputBOM = utf32LEBOM;
fseek(csv, 4, SEEK_SET);
} else if(readBOMBytes >= strlen(utf71BOM) && strncmp(utf71BOM, BOM, strlen(utf71BOM)) == 0){
encoding = UTF7;
outputBOM = utf71BOM;
fseek(csv, strlen(utf71BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf72BOM) && strncmp(utf72BOM, BOM, strlen(utf72BOM)) == 0){
encoding = UTF7;
outputBOM = utf72BOM;
fseek(csv, strlen(utf72BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf73BOM) && strncmp(utf73BOM, BOM, strlen(utf73BOM)) == 0){
encoding = UTF7;
outputBOM = utf73BOM;
fseek(csv, strlen(utf73BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf74BOM) && strncmp(utf74BOM, BOM, strlen(utf74BOM)) == 0){
encoding = UTF7;
outputBOM = utf74BOM;
fseek(csv, strlen(utf74BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf75BOM) && strncmp(utf75BOM, BOM, strlen(utf75BOM)) == 0){
encoding = UTF7;
outputBOM = utf75BOM;
fseek(csv, strlen(utf75BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utf1BOM) && strncmp(utf1BOM, BOM, strlen(utf1BOM)) == 0){
encoding = UTF1;
outputBOM = utf1BOM;
fseek(csv, strlen(utf1BOM), SEEK_SET);
} else if(readBOMBytes >= strlen(utfEBCDICBOM) && strncmp(utfEBCDICBOM, BOM, strlen(utfEBCDICBOM)) == 0){
encoding = UTFEBCDIC;
outputBOM = utfEBCDICBOM;
fseek(csv, strlen(utfEBCDICBOM), SEEK_SET);
} else if(readBOMBytes >= strlen(SCSUBOM) && strncmp(SCSUBOM, BOM, strlen(SCSUBOM)) == 0){
encoding = SCSU;
outputBOM = SCSUBOM;
fseek(csv, strlen(SCSUBOM), SEEK_SET);
} else if(readBOMBytes >= strlen(BOCUBOM) && strncmp(BOCUBOM, BOM, strlen(BOCUBOM)) == 0){
encoding = BOCU;
outputBOM = BOCUBOM;
fseek(csv, strlen(BOCUBOM), SEEK_SET);
} else if(readBOMBytes >= strlen(GB18030BOM) && strncmp(GB18030BOM, BOM, strlen(GB18030BOM)) == 0){
encoding = GB18030;
outputBOM = GB18030BOM;
fseek(csv, strlen(GB18030BOM), SEEK_SET);
} else {
fseek(csv, 0, SEEK_SET);
}
switch(encoding){
case ASCII:
std::cerr << "Using ASCII text encoding" << std::endl;
break;
case UTF8:
std::cerr << "Using UTF-8 text encoding" << std::endl;
break;
case UTF16LittleEndian:
std::cerr << "Using UTF-16 LE text encoding" << std::endl;
break;
case UTF16BigEndian:
std::cerr << "Using UTF-16 BE text encoding" << std::endl;
break;
case UTF32LittleEndian:
std::cerr << "Using UTF-32 LE text encoding" << std::endl;
break;
case UTF32BigEndian:
std::cerr << "Using UTF-32 BE text encoding" << std::endl;
break;
case UTF7:
std::cerr << "Using UTF-7 text encoding" << std::endl;
break;
case UTF1:
std::cerr << "Using UTF-1 text encoding" << std::endl;
break;
case UTFEBCDIC:
std::cerr << "Using UTF-EBCDIC text encoding" << std::endl;
break;
case SCSU:
std::cerr << "Using SCSU text encoding" << std::endl;
break;
case BOCU:
std::cerr << "Using BOCU text encoding" << std::endl;
break;
case GB18030:
std::cerr << "Using GB-18030 text encoding" << std::endl;
break;
}
const char delim = '{';
const char lineDelim = '\n';
const int bufferSize = 1048576; // 1MB
const int outBufferSize = 1048576; // 1MB
const int lineBufferSize = 1048576; // 1MB
size_t rows = 0;
size_t rowsThisIter = 0;
size_t read = 0;
size_t parsed = strlen(outputBOM);
size_t parsedThisIter = 0;
size_t rowBufferSize = 0;
bufferString * rowBuffer = NULL;
char * buffer = NULL;
// malloc must succeed
assert(posix_memalign((void **)&buffer, 16, sizeof(char) * bufferSize) == 0);
if(buffer == NULL){
std::cerr << "Error allocating read buffer, " << errno << std::endl;
exit(-1);
}
char * outputBuffer = NULL;
// malloc must succeed
assert(posix_memalign((void **)&outputBuffer, 16, sizeof(char) * outBufferSize) == 0);
if(outputBuffer == NULL){
std::cerr << "Error allocating output buffer, " << errno << std::endl;
exit(-1);
}
char * lineBuffer = NULL;
// malloc must succeed
assert(posix_memalign((void **)&lineBuffer, 16, sizeof(char) * lineBufferSize) == 0);
if(lineBuffer == NULL){
std::cerr << "Error allocating row buffer, " << errno << std::endl;
exit(-1);
}
flush_settings outConfig = {
.destination = stdout,
.outputBuffer = outputBuffer,
.outputBufferSize = outBufferSize,
.lineBuffer = lineBuffer,
.lineBufferSize = lineBufferSize,
.outputBufferHead = 0
};
if(strlen(outputBOM) > 0){
fwrite(outputBOM, sizeof(char), strlen(outputBOM), outConfig.destination);
}
vector<const string *> headers;
while(!feof(csv)){
read = fread(buffer, sizeof(char), bufferSize, csv);
if(read == 0){
if(feof(csv)){
break;
} else {
std::cerr << "Error reading from file: " << errno << std::endl;
exit(-1);
}
}
parsedThisIter = parseBuffer(&headers, buffer, read, &rowBuffer, &rowBufferSize, &outConfig, delim, lineDelim, &rowsThisIter);
if(parsedThisIter == 0){
std::cerr << "Failed to acquire line in buffer size " << bufferSize << ", consider increasing to fit line width, parsed up to " << parsed << " bytes so far" << std::endl;
// We failed to parse anything in this block, our buffer size is too small and we cannot correct
assert(parsedThisIter > 0);
}
parsed += parsedThisIter;
rows += rowsThisIter;
if(fseek(csv, parsed, SEEK_SET) == -1){
std::cerr << "Error seeking back: " << errno << std::endl;
exit(-1);
}
}
onWrite(0, &outConfig, true);
if(fflush(outConfig.destination) != 0){
std::cerr << "Failed to flush output, error: " << errno << std::endl;
}
if(rowBufferSize > 0){
free(rowBuffer);
}
free(buffer);
fclose(csv);
std::cerr << "Processed " << rows << " rows" << std::endl;
return 0;
}
@netshade
Copy link
Author

netshade@shade [10:08:05] [~/opt/data/csvplay] [master *]
-> % ls -lash Some10GBFile.csv
21438096 -rwx------+ 1 netshade  staff    10G Aug 16 15:17 Some10GBFile.csv
netshade@shade [10:09:39] [~/opt/data/csvplay] [master *]
-> % time ./csv < Some10GBFile.csv > /dev/null
Using UTF-32 BE text encoding
Processed 20417755 rows
./csv < Some10GBFile.csv > /dev/null  30.66s user 5.27s system 98% cpu 36.421 total
./mawk -F "\{" "{print}" < Some10GBFile.csv > /dev/null  0.00s user 0.00s system 46% cpu 0.002 total
netshade@shade [10:10:45] [~/opt/data/csvplay] [master *]
-> % time mawk -F "\{" "{print}" < Some10GBFile.csv > /dev/null 
mawk -F "\{" "{print}" < Some10GBFile.csv > /dev/null  7.86s user 5.25s system 92% cpu 14.211 total
netshade@shade [10:12:08] [~/opt/data/csvplay] [master *]
-> % time mawk -F "\{" "{print \$1}" < Some10GBFile.csv > /dev/null
^[[Cmawk -F "\{" "{print \$1}" < Some10GBFile.csv > /dev/null  44.60s user 6.82s system 97% cpu 52.699 total
netshade@shade [10:11:23] [~/opt/data/csvplay] [master *]
-> % time ./csv < Some10GBFile.csv > /dev/null                   
Using UTF-32 BE text encoding
Processed 20417755 rows
./csv < Some10GBFile.csv > /dev/null  31.53s user 4.97s system 98% cpu 37.063 total

@netshade
Copy link
Author

Compild w/ g++ -O3 -o csv ./main.cpp:

netshade@shade [10:13:05] [~/opt/data/csvplay] [master *]
-> % g++ --version
Configured with: --prefix=/Applications/Xcode.app/Contents/Developer/usr --with-gxx-include-dir=/usr/include/c++/4.2.1
Apple LLVM version 6.0 (clang-600.0.57) (based on LLVM 3.5svn)
Target: x86_64-apple-darwin14.4.0
Thread model: posix

@netshade
Copy link
Author

w/ simd lookahead, ~10s faster:

netshade@shade [01:02:48] [~/opt/data/csvplay] [master *]
-> % time ./csv Some10GBFile.csv > /dev/null 
Using ASCII text encoding
Processed 20417755 rows
./csv Some10GBFile.csv > /dev/null  21.15s user 4.55s system 98% cpu 26.188 total
netshade@shade [01:03:19] [~/opt/data/csvplay] [master *]
-> % time ./csv Some10GBFile.csv > /dev/null
Using ASCII text encoding
Processed 20417755 rows
./csv Some10GBFile.csv > /dev/null  21.19s user 5.22s system 98% cpu 26.891 total
netshade@shade [01:03:47] [~/opt/data/csvplay] [master *]
-> % time ./csv Some10GBFile.csv > /dev/null
Using ASCII text encoding
Processed 20417755 rows
./csv Some10GBFile.csv > /dev/null  21.16s user 5.01s system 98% cpu 26.601 total
netshade@shade [01:04:16] [~/opt/data/csvplay] [master *]
-> % 

Compiled w/ g++ -O3 -msse4.2 -o csv ./main.cpp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment