Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created October 4, 2018 06:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/310e97defd41382b9acb9c4b95e1d13b to your computer and use it in GitHub Desktop.
Save djspiewak/310e97defd41382b9acb9c4b95e1d13b to your computer and use it in GitHub Desktop.
/*
* Copyright 2014–2018 SlamData Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package quasar.yggdrasil
package table
import quasar.precog.common._
import tectonic.{Enclosure, Plate, Signal}
import scala.collection.mutable
private[table] final class SlicePlate extends Plate[List[Slice]] {
private val MaxLongStrLength = Long.MinValue.toString.length
private var nextThreshold = Config.defaultMinRows
private var size = 0 // rows in process don't count toward size
private val columns = mutable.Map[ColumnRef, ArrayColumn[_]]()
private var cursor: List[CPathNode] = Nil
private var nextIndex: List[Int] = 0 :: Nil
private val completed = mutable.ListBuffer[Slice]()
def nul(): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CNull)).asInstanceOf[MutableNullColumn]
col(size) = true
Signal.Continue
}
def fls(): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CBoolean)).asInstanceOf[ArrayBoolColumn]
col(size) = false
Signal.Continue
}
def tru(): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CBoolean)).asInstanceOf[ArrayBoolColumn]
col(size) = true
Signal.Continue
}
def map(): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CEmptyObject)).asInstanceOf[MutableEmptyObjectColumn]
col(size) = true
Signal.Continue
}
def arr(): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CEmptyArray)).asInstanceOf[MutableEmptyArrayColumn]
col(size) = true
Signal.Continue
}
def num(s: CharSequence, decIdx: Int, expIdx: Int): Signal = {
growArrays()
if (decIdx < 0 && expIdx < 0 && s.length < MaxLongStrLength) {
val col = checkGet(ColumnRef(CPath(cursor.reverse), CLong)).asInstanceOf[ArrayLongColumn]
col(size) = java.lang.Long.parseLong(s.toString)
} else if (decIdx < 0) {
try {
val ln = java.lang.Long.parseLong(s.toString)
val col = checkGet(ColumnRef(CPath(cursor.reverse), CLong)).asInstanceOf[ArrayLongColumn]
col(size) = ln
} catch {
case _: NumberFormatException =>
val col = checkGet(ColumnRef(CPath(cursor.reverse), CNum)).asInstanceOf[ArrayNumColumn]
col(size) = BigDecimal(s.toString)
}
} else {
val num = BigDecimal(s.toString)
if (num.isDecimalDouble) {
val col = checkGet(ColumnRef(CPath(cursor.reverse), CDouble)).asInstanceOf[ArrayDoubleColumn]
col(size) = num.doubleValue
} else {
val col = checkGet(ColumnRef(CPath(cursor.reverse), CNum)).asInstanceOf[ArrayNumColumn]
col(size) = num
}
}
Signal.Continue
}
def str(s: CharSequence): Signal = {
growArrays()
val col = checkGet(ColumnRef(CPath(cursor.reverse), CString)).asInstanceOf[ArrayStrColumn]
col(size) = s.toString
Signal.Continue
}
def enclosure(): Enclosure = cursor match {
case CPathIndex(_) :: _ => Enclosure.Array
case CPathField(_) :: _ => Enclosure.Map
case CPathMeta(_) :: _ => Enclosure.Meta
case CPathArray :: _ => sys.error("no")
case Nil => Enclosure.None
}
def nestMap(pathComponent: CharSequence): Signal = {
cursor ::= CPathField(pathComponent.toString)
nextIndex ::= 0
Signal.Continue
}
def nestArr(): Signal = {
val idx :: tail = nextIndex
nextIndex = 0 :: (idx + 1) :: tail
cursor ::= CPathIndex(idx)
Signal.Continue
}
def nestMeta(pathComponent: CharSequence): Signal = {
cursor ::= CPathMeta(pathComponent.toString)
nextIndex ::= 0
Signal.Continue
}
def unnest(): Signal = {
nextIndex = nextIndex.tail
cursor = cursor.tail
Signal.Continue
}
def finishRow(): Unit = {
size += 1
nextIndex = 0 :: Nil
if (size > Config.maxSliceRows || columns.size > Config.maxSliceColumns) {
finishSlice()
}
}
def finishBatch(): List[Slice] = {
finishSlice()
val back = completed.toList
completed.clear()
back
}
private def checkGet(ref: ColumnRef): ArrayColumn[_] = {
var back = columns(ref)
if (back == null) {
back = ref.ctype match {
case CBoolean => ArrayBoolColumn.empty(nextThreshold)
case CLong => ArrayLongColumn.empty(nextThreshold)
case CDouble => ArrayDoubleColumn.empty(nextThreshold)
case CString => ArrayStrColumn.empty(nextThreshold)
case CNull => MutableNullColumn.empty()
case CEmptyArray => MutableEmptyArrayColumn.empty()
case CEmptyObject => MutableEmptyObjectColumn.empty()
case tpe => sys.error(s"should be impossible to create a column of $tpe")
}
columns(ref) = back
}
back
}
private def growArrays(): Unit = {
if (size >= nextThreshold) {
nextThreshold *= 2
for ((_, col) <- columns) {
col.resize(nextThreshold)
}
}
}
private def finishSlice(): Unit = {
if (size > 0) {
val slice = Slice(size, columns.to[λ[α => Map[ColumnRef, Column]]])
completed += slice
nextThreshold = Config.defaultMinRows
size = 0
columns.clear()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment