Skip to content

Instantly share code, notes, and snippets.

@thisisnic
Created July 27, 2023 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thisisnic/5bdb85d2742bc318433f2f14b8bd77cf to your computer and use it in GitHub Desktop.
Save thisisnic/5bdb85d2742bc318433f2f14b8bd77cf to your computer and use it in GitHub Desktop.
Reprex showing how to stream data into a Parquet file in R
library(arrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

# Set up temporary directory to store data
tf <- tempfile()
dir.create(tf)

# write dummy dataset - I've grouped mtcars like this just to produce 1 file per row
write_dataset(group_by(mtcars, qsec), tf, format = "csv")

# 30 different files 
file_list <- list.files(tf, recursive = TRUE, full.names = TRUE)
file_list
#>  [1] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=14.5/part-0.csv" 
#>  [2] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=14.6/part-0.csv" 
#>  [3] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=15.41/part-0.csv"
#>  [4] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=15.5/part-0.csv" 
#>  [5] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=15.84/part-0.csv"
#>  [6] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=16.46/part-0.csv"
#>  [7] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=16.7/part-0.csv" 
#>  [8] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=16.87/part-0.csv"
#>  [9] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=16.9/part-0.csv" 
#> [10] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.02/part-0.csv"
#> [11] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.05/part-0.csv"
#> [12] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.3/part-0.csv" 
#> [13] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.4/part-0.csv" 
#> [14] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.42/part-0.csv"
#> [15] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.6/part-0.csv" 
#> [16] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.82/part-0.csv"
#> [17] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=17.98/part-0.csv"
#> [18] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18.3/part-0.csv" 
#> [19] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18.52/part-0.csv"
#> [20] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18.6/part-0.csv" 
#> [21] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18.61/part-0.csv"
#> [22] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18.9/part-0.csv" 
#> [23] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=18/part-0.csv"   
#> [24] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=19.44/part-0.csv"
#> [25] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=19.47/part-0.csv"
#> [26] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=19.9/part-0.csv" 
#> [27] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=20.01/part-0.csv"
#> [28] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=20.22/part-0.csv"
#> [29] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=20/part-0.csv"   
#> [30] "/tmp/RtmpZpxDtp/filecc4ae685db33b/qsec=22.9/part-0.csv"

# output file - chucking it into a tempfile here
outfile <- tempfile(fileext = ".parquet")

# create a output stream pointing at a file of your choice
sink <- FileOutputStream$create(outfile)

# define the schema for the data
my_schema <- schema(mpg = float64(), cyl = int64(), disp = float64(), hp = int64(), 
    drat = float64(), wt = float64(), vs = int64(), am = int64(), 
    gear = int64(), carb = int64())

# Set up a Parquet Writer with the schema and a few other things set
writer <- ParquetFileWriter$create(
  schema = my_schema,
  sink,
  properties = ParquetWriterProperties$create(
    column_names = names(my_schema),
    version = "2.4",
    compression = arrow:::default_parquet_compression()
  )
)

# loop through each file, reading in the data and writing to Parquet
for(file_path in file_list){
  
  # swap this for file reading code of your choice
  input_data <- read.csv(file_path)
  
  # convert data to Arrow Table
  tbl_arrow <- as_arrow_table(input_data, schema = my_schema)
  
  # write to the Parquet file
  writer$WriteTable(tbl_arrow, chunk_size = 1)
}

# close the writer and the sink
writer$Close()
sink$close()

# test it's worked
read_parquet(outfile)
#> # A tibble: 32 × 10
#>      mpg   cyl  disp    hp  drat    wt    vs    am  gear  carb
#>    <dbl> <int> <dbl> <int> <dbl> <dbl> <int> <int> <int> <int>
#>  1  15.8     8 351     264  4.22  3.17     0     1     5     4
#>  2  15       8 301     335  3.54  3.57     0     1     5     8
#>  3  13.3     8 350     245  3.73  3.84     0     0     3     4
#>  4  19.7     6 145     175  3.62  2.77     0     1     5     6
#>  5  14.3     8 360     245  3.21  3.57     0     0     3     4
#>  6  21       6 160     110  3.9   2.62     0     1     4     4
#>  7  26       4 120.     91  4.43  2.14     0     1     5     2
#>  8  15.5     8 318     150  2.76  3.52     0     0     3     2
#>  9  30.4     4  95.1   113  3.77  1.51     1     1     5     2
#> 10  21       6 160     110  3.9   2.88     0     1     4     4
#> # ℹ 22 more rows

Created on 2023-07-27 with reprex v2.0.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment