Skip to content

Instantly share code, notes, and snippets.

@noxify
Last active May 9, 2023 14:42
Show Gist options
  • Save noxify/9b9d933ac742937a6360366851da97cf to your computer and use it in GitHub Desktop.
Save noxify/9b9d933ac742937a6360366851da97cf to your computer and use it in GitHub Desktop.
Parquet generation via JSON-Schema
// file: helpers/parquet.ts
import {
FieldDefinition,
ParquetType,
SchemaDefinition,
} from '@dsnp/parquetjs/dist/lib/declare'
import { JSONSchema4 } from 'json-schema'
export function createStringField({
optional = true,
}: Partial<{
optional: boolean
}>): Partial<FieldDefinition> {
return createField({ type: 'UTF8', optional })
}
export function createBooleanField({
optional = true,
}: {
optional?: boolean
}): Partial<FieldDefinition> {
return createField({ type: 'BOOLEAN', optional })
}
export function createIntField({
optional = true,
}: {
optional?: boolean
}): Partial<FieldDefinition> {
return createField({ type: 'INT64', optional })
}
export function createFloatField({
optional = true,
}: Partial<{
optional: boolean
}>): Partial<FieldDefinition> {
return createField({ type: 'FLOAT', optional })
}
export function createDecimalField({
precision = 3,
optional = true,
}: Partial<{
precision?: number
optional?: boolean
}>): Partial<FieldDefinition> {
return createField({ type: 'DECIMAL', precision, optional })
}
export function createTimestampField({
optional = true,
}: Partial<{
optional?: boolean
}>) {
return createField({ type: 'TIMESTAMP_MILLIS', optional })
}
export function createRepeatableStructField({
fields,
}: {
fields: { [fieldName: string]: FieldDefinition }
}): Partial<FieldDefinition> {
return {
optional: true,
type: 'LIST',
fields: {
list: {
optional: false,
repeated: true,
fields: {
element: {
optional: true,
repeated: false,
fields: fields,
},
},
},
},
}
}
export function createStructField({
fields,
}: {
fields: { [fieldName: string]: FieldDefinition }
}): Partial<FieldDefinition> {
return {
optional: true,
fields: fields,
}
}
export function createArrayField({
type,
optional = true,
}: Partial<{
type: ParquetType
optional?: boolean
}>): Partial<FieldDefinition> {
return createField({
optional,
type: 'LIST',
fields: {
list: {
optional: false,
repeated: true,
fields: {
element: {
type,
optional: true,
},
},
},
},
})
}
export function createField(
definition: FieldDefinition,
): Partial<FieldDefinition> {
return definition
}
export function createFromJsonSchema({
definition,
}: {
definition: JSONSchema4
}) {
const schema: SchemaDefinition = {}
for (const [fieldName, fieldValue] of Object.entries<JSONSchema4>(
definition.properties || {},
)) {
switch (fieldValue.type) {
case 'string':
schema[fieldName] = createStringField({})
break
case 'integer':
case 'number':
schema[fieldName] = createIntField({})
break
case 'boolean':
schema[fieldName] = createBooleanField({})
break
case 'array':
switch ((fieldValue.items as JSONSchema4[])[0].type) {
case 'string':
schema[fieldName] = createArrayField({ type: 'UTF8' })
break
case 'integer':
case 'number':
schema[fieldName] = createArrayField({ type: 'INT64' })
break
case 'boolean':
schema[fieldName] = createArrayField({ type: 'BOOLEAN' })
break
case 'object':
schema[fieldName] = createRepeatableStructField({
fields: createFromJsonSchema({
definition: (fieldValue.items as JSONSchema4[])[0],
}),
})
break
}
break
case 'object':
schema[fieldName] = {
fields: createFromJsonSchema({ definition: fieldValue }),
}
break
default:
console.warn(
`Unable to find a definition for field "${fieldName}" with JSON Schema type "${fieldValue.type}".`,
)
}
}
return schema
}
function convertTypeToSql({
fieldName,
type,
isArray = false,
isObject = false,
fields = [],
}: {
fieldName: string
type: string
isArray?: boolean
isObject?: boolean
fields?: [string, string][]
}) {
if (!isObject && !isArray) {
return [fieldName, type]
} else if (!isObject && isArray) {
return [fieldName, `array(${type})`]
} else if (isObject && !isArray) {
const parsedSubFields = fields.map(
(fieldDefinition) => `${fieldDefinition[0]} ${fieldDefinition[1]}`,
)
return [fieldName, `ROW(${parsedSubFields.join(', ')})`]
} else {
throw new Error(
`The current combination of isArray and isObject for field "${fieldName}" is not allowed.`,
)
}
}
export function generateSqlSchema({
definition,
}: {
definition: { [key: string]: any }
}): any[] {
const sqlSchema = []
for (const [fieldName, fieldValue] of Object.entries<{ [key: string]: any }>(
definition,
)) {
switch (fieldValue.type) {
case 'string':
sqlSchema.push(convertTypeToSql({ fieldName, type: 'varchar' }))
break
case 'date-time':
sqlSchema.push(convertTypeToSql({ fieldName, type: 'timestamp(3)' }))
break
case 'integer':
case 'number':
sqlSchema.push(convertTypeToSql({ fieldName, type: 'bigint' }))
break
case 'boolean':
sqlSchema.push(convertTypeToSql({ fieldName, type: 'boolean' }))
break
case 'array':
const arrayItems = !Array.isArray(fieldValue.items)
? [fieldValue.items]
: fieldValue.items
switch (arrayItems[0].type) {
case 'string':
sqlSchema.push(
convertTypeToSql({ fieldName, type: 'varchar', isArray: true }),
)
break
case 'integer':
case 'number':
sqlSchema.push(
convertTypeToSql({ fieldName, type: 'bigint', isArray: true }),
)
break
case 'boolean':
sqlSchema.push(
convertTypeToSql({ fieldName, type: 'boolean', isArray: true }),
)
break
case 'object':
const tmpFields = generateSqlSchema({
definition: arrayItems[0].properties,
})
const tmpSchema = convertTypeToSql({
fieldName,
type: 'ROW',
isObject: true,
fields: tmpFields,
})
sqlSchema.push(
convertTypeToSql({
fieldName,
type: tmpSchema[1],
isArray: true,
}),
)
break
}
break
case 'object':
const tmpFields = generateSqlSchema({
definition: fieldValue.properties,
})
sqlSchema.push(
convertTypeToSql({
fieldName,
type: 'row',
isObject: true,
fields: tmpFields,
}),
)
break
default:
console.warn(
`Unable to find a definition for field "${fieldName}" with JSON Schema type "${fieldValue.type}".`,
)
}
}
return sqlSchema
}
function parquetArrayTransformer({
fieldName,
definition,
}: {
fieldName: string
definition: any
}) {
return [
`${fieldName}`,
{
'list[]': {
element:
definition.type === 'object'
? parquetTransformer({
definition: definition.properties,
})
: {},
},
},
]
}
export function parquetTransformer({ definition }: { definition: any }) {
const res: any = {}
for (const [fieldName, fieldDefinition] of Object.entries<JSONSchema4>(
definition,
)) {
if (fieldDefinition.type !== 'object' && fieldDefinition.type !== 'array') {
res[fieldName] = fieldName
}
if (fieldDefinition.type === 'object') {
res[fieldName] = [
fieldName,
parquetTransformer({
definition: fieldDefinition.properties,
}),
]
}
if (fieldDefinition.type === 'array') {
res[fieldName] = parquetArrayTransformer({
fieldName,
definition: (fieldDefinition.items as JSONSchema4[])[0],
})
}
}
return res
}
{
"dependencies": {
"map-transform": "0.4.1",
"@dsnp/parquetjs": "1.2.3"
},
"devDependencies": {
"@types/json-schema": "7.0.11"
}
}
export default {
type: 'object',
properties: {
string_field: {
type: 'string',
},
int_field: {
type: 'integer',
},
array_field: {
type: 'array',
items: [
{
type: 'string',
},
],
additionalItems: false,
},
timestamp_field: {
type: 'string',
},
obj_field: {
type: 'object',
properties: {
sub1: {
type: 'string',
},
sub2: {
type: 'string',
},
},
additionalProperties: false,
},
struct_field: {
type: 'array',
items: [
{
type: 'object',
properties: {
sub3: {
type: 'string',
},
sub4: {
type: 'string',
},
sub5: {
type: 'object',
properties: {
sub6: {
type: 'string',
},
sub7: {
type: 'string',
},
},
additionalProperties: false,
},
sub8: {
type: 'array',
items: [
{
type: 'string',
},
{
type: 'string',
},
],
additionalItems: false,
},
},
additionalProperties: false,
},
],
additionalItems: false,
},
},
additionalProperties: false,
}
import jsonSchema from './test-schema'
import mapTransform from 'map-transform'
import parquetjs from '@dsnp/parquetjs'
import { createFromJsonSchema } from 'helpers/parquet.js'
import path from 'path'
const userInput = [
{
"string_field": true,
"int_field": 343,
"array_field": ["ele1", "ele2"],
"timestamp_field": "2016-07-20T17:30:15+05:30",
"obj_field": {
"sub1": "sub1val",
"sub2": "sub2val"
},
"struct_field": [
{
"sub3": "sub3val1",
"sub4": "sub4val1",
"sub5": {
"sub6": null,
"sub7": "sub7val"
},
"sub8": ["ele1", "ele2"]
}
]
}
]
// Generate a valid `map-transform` definition
// based on the given JSON Schema
const transformerDefinition = [
'data[]',
{
$iterate: true,
...parquetTransformer({
definition: jsonSchema.properties,
}),
},
]
// Generate a Schema for our Parquet file
const generatedParquetSchema = createFromJsonSchema({
definition: jsonSchema as JSONSchema4,
})
// Transform the given user input into the needed
// structure for `parquetjs.appendRow`
// this is required if you plan to have simple arrays or complex structures
// otherwise it's possible that the parquet file can't be read from 3rd party tools
// we have tested it via
// * PyCharm Parquet Viewer Plugin ( v3.0.0 )
// https://github.com/benwatson528/intellij-avro-parquet-plugin/releases/tag/v3.0.0
// * Our trino cluster
const parquetData = mapTransform(transformerDefinition)({
data: userInput,
})
const parquetFilePath = path.resolve('test_parquet.parquet')
const writer = await parquetjs.ParquetWriter.openFile(
parquetSchema,
parquetFilePath,
)
for (const input of parquetData) {
await writer.appendRow({
...input
})
}
await writer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment