Skip to content

Instantly share code, notes, and snippets.

@jrevels
Last active January 4, 2021 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jrevels/782acb6b25f71f14a8cee0a3dae85079 to your computer and use it in GitHub Desktop.
Save jrevels/782acb6b25f71f14a8cee0a3dae85079 to your computer and use it in GitHub Desktop.
using UUIDs, Dates, Arrow, Tables
#####
##### Signals
#####
struct Signal <: Tables.AbstractRow
recording_uuid::UUID
type::String
file_uri::String
file_metadata::Union{Missing,Nothing,Dict{String,String}} # `Missing` is needed here to match the generated Arrow.Table schema
channel_names::Vector{String}
start_nanosecond::Nanosecond
stop_nanosecond::Nanosecond
sample_unit::String
sample_resolution_in_unit::Float64
sample_offset_in_unit::Float64
sample_type::String
sample_rate::Float64
end
Signal(row) = Signal(row.recording_uuid, row.type, row.file_uri, row.file_metadata,
row.channel_names, row.start_nanosecond, row.stop_nanosecond,
row.sample_unit, row.sample_resolution_in_unit, row.sample_offset_in_unit,
row.sample_type, row.sample_rate)
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS)
Base.getproperty(signal::Signal, nm::Symbol) = getproperty(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm)
Tables.getcolumn(signal::Signal, i::Int) = Tables.getcolumn(getfield(signal, :_row), i)::fieldtype(SIGNAL_FIELDS, i)
Tables.getcolumn(signal::Signal, nm::Symbol) = Tables.getcolumn(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm)
Tables.getcolumn(signal::Signal, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signal, :_row), T, i, nm)
Tables.columnnames(signal::Signal) = Tables.columnnames(getfield(signal, :_row))
Arrow.ArrowTypes.registertype!(Signal, Signal)
const SIGNALS_TABLE_SCHEMA = Tables.Schema{fieldnames(Signal),Tuple{fieldtypes(Signal)...}}()
struct SignalsTable{T} <: Tables.AbstractColumns
_table::T
function SignalsTable(_table::T) where {T}
schema = Tables.schema(_table)
schema === SIGNALS_TABLE_SCHEMA || throw(ArgumentError("_table does not have appropriate SignalsTable schema: $schema"))
return new{T}(_table)
end
end
Tables.istable(table::SignalsTable) = true
Tables.columnaccess(table::SignalsTable) = Tables.columnaccess(getfield(table, :_table))
Tables.columns(table::SignalsTable) = Tables.columns(getfield(table, :_table))
Tables.columnnames(table::SignalsTable) = Tables.columnnames(getfield(table, :_table))
Tables.getcolumn(table::SignalsTable, x::Int) = Tables.getcolumn(getfield(table, :_table), x)
Tables.getcolumn(table::SignalsTable, x::Symbol) = Tables.getcolumn(getfield(table, :_table), x)
Tables.rowaccess(table::SignalsTable) = Tables.rowaccess(getfield(table, :_table))
Tables.rows(table::SignalsTable) = (Signal(row) for row in Tables.rows(getfield(table, :_table)))
Tables.schema(table::SignalsTable) = Tables.schema(getfield(table, :_table))
Tables.materializer(table::SignalsTable) = Tables.materializer(getfield(table, :_table))
#####
##### Annotations
#####
struct Annotation{V} <: Tables.AbstractRow
recording_uuid::UUID
uuid::UUID
value::V
start_nanosecond::Nanosecond
stop_nanosecond::Nanosecond
end
Annotation(row) = Annotation(row.recording_uuid, row.uuid, row.value, row.start_nanosecond, row.stop_nanosecond)
Annotation{V}(row) where {V} = Annotation{V}(row.recording_uuid, row.uuid, row.value, row.start_nanosecond, row.stop_nanosecond)
Base.propertynames(::Annotation) = fieldnames(Annotation)
Base.getproperty(annotation::Annotation, name::Symbol) = getfield(annotation, name)
Arrow.ArrowTypes.registertype!(Annotation, Annotation)
function annotations_table_schema(::Type{V}) where {V}
return Tables.Schema{fieldnames(Annotation),Tuple{UUID,UUID,V,Nanosecond,Nanosecond}}()
end
struct AnnotationsTable{V,T} <: Tables.AbstractColumns
_table::T
function AnnotationsTable{V}(_table::T) where {V,T}
schema = Tables.schema(_table)
schema === annotations_table_schema(V) || throw(ArgumentError("_table does not have appropriate AnnotationsTable schema: $schema"))
return new{V,T}(_table)
end
function AnnotationsTable(_table)
schema = Tables.schema(_table)
length(schema.types) === 5 || throw(ArgumentError("_table does not have appropriate AnnotationsTable schema: $schema"))
return AnnotationsTable{schema.types[3]}(_table)
end
end
Tables.istable(table::AnnotationsTable) = true
Tables.columnaccess(table::AnnotationsTable) = Tables.columnaccess(getfield(table, :_table))
Tables.columns(table::AnnotationsTable) = Tables.columns(getfield(table, :_table))
Tables.columnnames(table::AnnotationsTable) = Tables.columnnames(getfield(table, :_table))
Tables.getcolumn(table::AnnotationsTable, x::Int) = Tables.getcolumn(getfield(table, :_table), x)
Tables.getcolumn(table::AnnotationsTable, x::Symbol) = Tables.getcolumn(getfield(table, :_table), x)
Tables.rowaccess(table::AnnotationsTable) = Tables.rowaccess(getfield(table, :_table))
Tables.rows(table::AnnotationsTable{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(table, :_table)))
Tables.schema(table::AnnotationsTable) = Tables.schema(getfield(table, :_table))
Tables.materializer(table::AnnotationsTable) = Tables.materializer(getfield(table, :_table))
#####
##### Recordings
#####
function by_recording(signals::SignalsTable, annotations::AnnotationsTable{V}) where {V}
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}()
for signal in Tables.rows(signals)
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()),
recordings, signal.recording_uuid)
recording.signals[signal.type] = signal
end
for annotation in Tables.rows(annotations)
recording = get(recordings, annotation.recording_uuid, nothing)
recording === nothing && continue
recording.annotations[annotation.uuid] = annotation
end
return recordings
end
function by_recording(annotations::AnnotationsTable{V}, signals::SignalsTable) where {V}
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()),
recordings, annotation.recording_uuid)
recording.annotations[annotation.uuid] = annotation
end
for signal in Tables.rows(signals)
recording = get(recordings, signal.recording_uuid, nothing)
recording === nothing && continue
recording.signals[signal.type] = signal
end
return recordings
end
function by_recording(signals::SignalsTable)
recordings = Dict{UUID,Dict{String,Signal}}()
for signal in Tables.rows(signals)
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid)
recording[signal.type] = signal
end
return recordings
end
function by_recording(annotations::AnnotationsTable{V}) where {V}
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid)
recording[annotation.uuid] = annotation
end
return recordings
end
#####
##### experiments
#####
n = 10000
signals = Signal[]
annotations = Annotation{NamedTuple{(:x, :y),Tuple{Int64,String}}}[]
for _ in 1:n
rec = uuid4()
push!(signals, Signal(rec, "eeg", "file://$(rec)/eeg.lpcm.zst", nothing,
["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"],
Nanosecond(0), Nanosecond(100000),
"microvolt", 1.0, 1.0, "float64", 256.0))
push!(signals, Signal(rec, "ecg", "file://$(rec)/ecg.lpcm.zst", Dict("a" => "absidufbaid", "b" => "adjfhaudi"),
["a", "b", "c", "d"],
Nanosecond(0), Nanosecond(100000),
"microvolt", 0.134, 1.9875, "int32", 128.0))
push!(annotations, Annotation(rec, uuid4(), (x = 1, y = "2"), Nanosecond(0), Nanosecond(10)))
push!(annotations, Annotation(rec, uuid4(), (x = 231, y = "asdkfjh"), Nanosecond(0), Nanosecond(10)))
end
write_table(tbl) = (io = IOBuffer(); Arrow.write(io, tbl); seekstart(io); Arrow.Table(io))
sigs_arrow = write_table(signals)
anns_arrow = write_table(annotations)
sigs = SignalsTable(Tables.columntable(sigs_arrow));
anns = AnnotationsTable(Tables.columntable(anns_arrow));
@time by_recording(sigs, anns); # 0.064109 seconds (839.00 k allocations: 74.526 MiB)
@time by_recording(anns, sigs); # 0.073439 seconds (839.00 k allocations: 74.526 MiB)
@time by_recording(sigs); # 0.031596 seconds (639.51 k allocations: 53.616 MiB)
@time by_recording(anns); # 0.031726 seconds (199.51 k allocations: 21.267 MiB)
#########################################################################################################################################################################
#########################################################################################################################################################################
#########################################################################################################################################################################
#=
This alternative approach creates thin `AbstractRow`/`AbstractColumns` wrappers that intentionally try to take on the charateristics of their underlying objects
via delegation. Unlike the previous approach above, this version's `AbstractRow` subtypes delegate as much as the `AbstractColumns` subtypes.
One might ask: At this point, what is the benefit of adding this layer at all over just passing around `Arrow.Table` and/or `NamedTuple`-based tables?
Answers:
1. Some useful schema/type checking/hinting opportunities can be surfaced in this layer.
2. This laye helps avoid type piracy/ambiguities/etc. when specializing common methods for these tables (e.g. `show`, `getindex`, etc.).
3. The composition-based approach here gives you 1 and 2 in a manner that's nicely orthogonal to implementational details of underlying
table types. For example, the caller can choose to work with `Signals(table::Arrow.Table)` or a fully-materialized
`Signals(map(collect, Tables.columntable(table::Arrow.Table))`, or whatever fits their use-case.
=#
using UUIDs, Dates, Arrow, Tables
#####
##### Signals
#####
struct Signal{R} <: Tables.AbstractRow
_row::R
end
const SIGNAL_FIELDS = NamedTuple{(:recording_uuid, :type, :file_uri, :file_metadata, :channel_names, :start_nanosecond, :stop_nanosecond, :sample_unit, :sample_resolution_in_unit, :sample_offset_in_unit, :sample_type, :sample_rate),
Tuple{UUID,String,String,Union{Missing,Nothing,Dict{String,String}},Vector{String},Nanosecond,Nanosecond,String,Float64,Float64,String,Float64}}
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS)
Base.getproperty(signal::Signal, name::Symbol) = getproperty(getfield(signal, :_row), name)::fieldtype(SIGNAL_FIELDS, name)
is_valid_signals_schema(::Nothing) = true
is_valid_signals_schema(::Tables.Schema) = false
is_valid_signals_schema(::Tables.Schema{fieldnames(SIGNAL_FIELDS),<:Tuple{fieldtypes(SIGNAL_FIELDS)...}}) = true
struct Signals{C} <: Tables.AbstractColumns
_columns::C
function Signals(_columns::C) where {C}
schema = Tables.schema(_columns)
is_valid_signals_schema(schema) || throw(ArgumentError("_table does not have appropriate Signals schema: $schema"))
return new{C}(_columns)
end
end
Tables.istable(signals::Signals) = Tables.istable(getfield(signals, :_columns))
Tables.schema(signals::Signals) = Tables.schema(getfield(signals, :_columns))
Tables.materializer(signals::Signals) = Tables.materializer(getfield(signals, :_columns))
Tables.rowaccess(signals::Signals) = Tables.rowaccess(getfield(signals, :_columns))
Tables.rows(signals::Signals) = (Signal(row) for row in Tables.rows(getfield(signals, :_columns)))
Tables.columnaccess(signals::Signals) = Tables.columnaccess(getfield(signals, :_columns))
Tables.columns(signals::Signals) = Tables.columns(getfield(signals, :_columns))
Tables.columnnames(signals::Signals) = Tables.columnnames(getfield(signals, :_columns))
Tables.getcolumn(signals::Signals, i::Int) = Tables.getcolumn(getfield(signals, :_columns), i)
Tables.getcolumn(signals::Signals, nm::Symbol) = Tables.getcolumn(getfield(signals, :_columns), nm)
Tables.getcolumn(signals::Signals, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signals, :_columns), T, i, nm)
#####
##### Annotations
#####
struct Annotation{V,R} <: Tables.AbstractRow
_row::R
end
Annotation(_row::R) where {R} = Annotation{fieldtype(R, :value),R}(_row)
Annotation{V}(_row::R) where {V,R} = Annotation{V,R}(_row)
function _annotation_fields(::Type{<:Annotation{V}}) where {V}
return NamedTuple{(:recording_uuid, :uuid, :value, :start_nanosecond, :stop_nanosecond),Tuple{UUID,UUID,V,Nanosecond,Nanosecond}}
end
Base.propertynames(::Annotation) = fieldnames(_annotation_fields(Annotation{Any}))
Base.getproperty(annotation::Annotation, name::Symbol) = getproperty(getfield(annotation, :_row), name)::fieldtype(_annotation_fields(typeof(annotation)), name)
is_valid_annotations_schema(::Nothing) = true
is_valid_annotations_schema(::Tables.Schema) = false
is_valid_annotations_schema(::Tables.Schema{fieldnames(_annotation_fields(Annotation{Any})),<:Tuple{fieldtypes(_annotation_fields(Annotation{Any}))...}}) = true
struct Annotations{V,C} <: Tables.AbstractColumns
_columns::C
function Annotations(_columns::C) where {C}
schema = Tables.schema(_columns)
is_valid_annotations_schema(schema) || throw(ArgumentError("_table does not have appropriate Annotations schema: $schema"))
return new{schema.types[3],C}(_columns)
end
end
Tables.istable(annotations::Annotations) = Tables.istable(getfield(annotations, :_columns))
Tables.schema(annotations::Annotations) = Tables.schema(getfield(annotations, :_columns))
Tables.materializer(annotations::Annotations) = Tables.materializer(getfield(annotations, :_columns))
Tables.rowaccess(annotations::Annotations) = Tables.rowaccess(getfield(annotations, :_columns))
Tables.rows(annotations::Annotations{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(annotations, :_columns)))
Tables.columnaccess(annotations::Annotations) = Tables.columnaccess(getfield(annotations, :_columns))
Tables.columns(annotations::Annotations) = Tables.columns(getfield(annotations, :_columns))
Tables.columnnames(annotations::Annotations) = Tables.columnnames(getfield(annotations, :_columns))
Tables.getcolumn(annotations::Annotations, i::Int) = Tables.getcolumn(getfield(annotations, :_columns), i)
Tables.getcolumn(annotations::Annotations, nm::Symbol) = Tables.getcolumn(getfield(annotations, :_columns), nm)
Tables.getcolumn(annotations::Annotations, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(annotations, :_columns), T, i, nm)
#####
##### by_recording
#####
function by_recording(signals::Signals, annotations::Annotations{V}) where {V}
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}()
for signal in Tables.rows(signals)
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()),
recordings, signal.recording_uuid)
recording.signals[signal.type] = signal
end
for annotation in Tables.rows(annotations)
recording = get(recordings, annotation.recording_uuid, nothing)
recording === nothing && continue
recording.annotations[annotation.uuid] = annotation
end
return recordings
end
function by_recording(annotations::Annotations{V}, signals::Signals) where {V}
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()),
recordings, annotation.recording_uuid)
recording.annotations[annotation.uuid] = annotation
end
for signal in Tables.rows(signals)
recording = get(recordings, signal.recording_uuid, nothing)
recording === nothing && continue
recording.signals[signal.type] = signal
end
return recordings
end
function by_recording(signals::Signals)
recordings = Dict{UUID,Dict{String,Signal}}()
for signal in Tables.rows(signals)
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid)
recording[signal.type] = signal
end
return recordings
end
function by_recording(annotations::Annotations{V}) where {V}
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid)
recording[annotation.uuid] = annotation
end
return recordings
end
#####
##### experiments
#####
n = 10000
signals = Signal[]
annotations = Annotation{NamedTuple{(:x, :y),Tuple{Int64,String}}}[]
for _ in 1:n
rec = uuid4()
push!(signals, Signal((recording_uuid=rec, type="eeg", file_uri="file://$(rec)/eeg.lpcm.zst", file_metadata=nothing,
channel_names=["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"],
start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(100000),
sample_unit="microvolt", sample_resolution_in_unit=1.0, sample_offset_in_unit=1.0,
sample_type="float64", sample_rate=256.0)))
push!(signals, Signal((recording_uuid=rec, type="ecg", file_uri="file://$(rec)/ecg.lpcm.zst", file_metadata=Dict("a" => "absidufbaid", "b" => "adjfhaudi"),
channel_names=["a", "b", "c", "d"],
start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(100000),
sample_unit="microvolt", sample_resolution_in_unit=0.134, sample_offset_in_unit=1.9875,
sample_type="int32", sample_rate=128.0)))
push!(annotations, Annotation((recording_uuid=rec, uuid=uuid4(), value=(x = 1, y = "2"), start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(10))))
push!(annotations, Annotation((recording_uuid=rec, uuid=uuid4(), value=(x = 231, y = "asdkfjh"), start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(10))))
end
write_table(tbl) = (io = IOBuffer(); Arrow.write(io, tbl); seekstart(io); Arrow.Table(io))
sigs_arrow = write_table(signals)
anns_arrow = write_table(annotations)
sigs = Signals(Tables.columntable(sigs_arrow))
anns = Annotations(Tables.columntable(anns_arrow))
@time by_recording(sigs, anns); # 0.010518 seconds (140.02 k allocations: 15.338 MiB)
@time by_recording(anns, sigs); # 0.007881 seconds (140.02 k allocations: 15.338 MiB)
@time by_recording(sigs); # 0.004087 seconds (80.02 k allocations: 7.542 MiB)
@time by_recording(anns); # 0.003655 seconds (60.02 k allocations: 8.152 MiB)
##########################################################################################################################################################################
##########################################################################################################################################################################
##########################################################################################################################################################################
using UUIDs, Dates, Arrow, Tables
#####
##### Signals
#####
struct Signal{R} <: Tables.AbstractRow
_row::R
end
const SIGNAL_FIELDS = NamedTuple{(:recording_uuid, :type, :file_path, :file_metadata, :channel_names, :start_nanosecond, :stop_nanosecond, :sample_unit, :sample_resolution_in_unit, :sample_offset_in_unit, :sample_type, :sample_rate),
Tuple{UUID,String,String,Union{Missing,Nothing,Dict{String,String}},Vector{String},Nanosecond,Nanosecond,String,Float64,Float64,String,Float64}}
function Signal(; recording_uuid::UUID,
type,
file_path,
file_metadata::Union{Missing,Nothing,Dict{String,String}}=nothing,
channel_names,
start_nanosecond,
stop_nanosecond,
sample_unit,
sample_resolution_in_unit,
sample_offset_in_unit,
sample_type,
sample_rate)
return Signal{SIGNAL_FIELDS}((; recording_uuid,
type=String(type),
file_path=String(file_path),
file_metadata,
channel_names=convert(Vector{String}, channel_names),
start_nanosecond=Nanosecond(start_nanosecond),
stop_nanosecond=Nanosecond(stop_nanosecond),
sample_unit=String(sample_unit),
sample_resolution_in_unit=Float64(sample_resolution_in_unit),
sample_offset_in_unit=Float64(sample_offset_in_unit),
sample_type=String(sample_type),
sample_rate=Float64(sample_rate)))
end
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS)
Base.getproperty(signal::Signal, name::Symbol) = getproperty(getfield(signal, :_row), name)::fieldtype(SIGNAL_FIELDS, name)
Tables.columnnames(::Signal) = fieldnames(SIGNAL_FIELDS)
Tables.getcolumn(signal::Signal, i::Int) = Tables.getcolumn(getfield(signal, :_row), i)::fieldtype(SIGNAL_FIELDS, i)
Tables.getcolumn(signal::Signal, nm::Symbol) = Tables.getcolumn(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm)
Tables.getcolumn(signal::Signal, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signal, :_row), T, i, nm)::fieldtype(SIGNAL_FIELDS, i)
Tables.schema(::AbstractVector{<:Signal}) = Tables.Schema(fieldnames(SIGNAL_FIELDS), fieldtypes(SIGNAL_FIELDS))
is_valid_signals_schema(::Nothing) = true
is_valid_signals_schema(::Tables.Schema) = false
is_valid_signals_schema(::Tables.Schema{fieldnames(SIGNAL_FIELDS),<:Tuple{fieldtypes(SIGNAL_FIELDS)...}}) = true
struct Signals{C} <: Tables.AbstractColumns
_columns::C
function Signals(_columns::C) where {C}
schema = Tables.schema(_columns)
is_valid_signals_schema(schema) || throw(ArgumentError("_table does not have appropriate Signals schema: $schema"))
return new{C}(_columns)
end
end
Signals() = Signals(Tables.columntable(SIGNAL_FIELDS[]))
Tables.istable(signals::Signals) = Tables.istable(getfield(signals, :_columns))
Tables.schema(signals::Signals) = Tables.schema(getfield(signals, :_columns))
Tables.materializer(signals::Signals) = Tables.materializer(getfield(signals, :_columns))
Tables.rowaccess(signals::Signals) = Tables.rowaccess(getfield(signals, :_columns))
Tables.rows(signals::Signals) = (Signal(row) for row in Tables.rows(getfield(signals, :_columns)))
Tables.columnaccess(signals::Signals) = Tables.columnaccess(getfield(signals, :_columns))
Tables.columns(signals::Signals) = Tables.columns(getfield(signals, :_columns))
Tables.columnnames(signals::Signals) = Tables.columnnames(getfield(signals, :_columns))
Tables.getcolumn(signals::Signals, i::Int) = Tables.getcolumn(getfield(signals, :_columns), i)
Tables.getcolumn(signals::Signals, nm::Symbol) = Tables.getcolumn(getfield(signals, :_columns), nm)
Tables.getcolumn(signals::Signals, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signals, :_columns), T, i, nm)
#####
##### Annotations
#####
struct Annotation{V,R} <: Tables.AbstractRow
_row::R
end
_annotation_fields(::Type{V}) where {V} = NamedTuple{(:recording_uuid, :uuid, :start_nanosecond, :stop_nanosecond, :value),Tuple{UUID,UUID,Nanosecond,Nanosecond,V}}
Annotation(_row::R) where {R} = Annotation{fieldtype(R, :value),R}(_row)
Annotation{V}(_row::R) where {V,R} = Annotation{V,R}(_row)
function Annotation{V}(; recording_uuid::UUID, uuid::UUID, start_nanosecond, stop_nanosecond, value) where {V}
return Annotation{V,_annotation_fields(V)}((; recording_uuid, uuid,
start_nanosecond=Nanosecond(start_nanosecond),
stop_nanosecond=Nanosecond(stop_nanosecond),
value=convert(V, value)))
end
function Annotation(; recording_uuid, uuid, start_nanosecond, stop_nanosecond, value::V) where {V}
return Annotation{V}(; recording_uuid, uuid, start_nanosecond, stop_nanosecond, value)
end
Base.propertynames(::Annotation) = fieldnames(_annotation_fields(Any))
Base.getproperty(annotation::Annotation{V}, name::Symbol) where {V} = getproperty(getfield(annotation, :_row), name)::fieldtype(_annotation_fields(V), name)
Tables.columnnames(::Annotation) = fieldnames(_annotation_fields(Any))
Tables.getcolumn(ann::Annotation{V}, i::Int) where {V} = Tables.getcolumn(getfield(ann, :_row), i)::fieldtype(_annotation_fields(V), i)
Tables.getcolumn(ann::Annotation{V}, nm::Symbol) where {V} = Tables.getcolumn(getfield(ann, :_row), nm)::fieldtype(_annotation_fields(V), nm)
Tables.getcolumn(ann::Annotation{V}, ::Type{T}, i::Int, nm::Symbol) where {V,T} = Tables.getcolumn(getfield(ann, :_row), T, i, nm)::fieldtype(_annotation_fields(V), i)
function Tables.schema(::AbstractVector{<:Annotation{V}}) where {V}
F = _annotation_fields(V)
return Tables.Schema(fieldnames(F), fieldtypes(F))
end
is_valid_annotations_schema(::Nothing) = true
is_valid_annotations_schema(::Tables.Schema) = false
is_valid_annotations_schema(::Tables.Schema{fieldnames(_annotation_fields(Any)),<:Tuple{fieldtypes(_annotation_fields(Any))...}}) = true
struct Annotations{V,C} <: Tables.AbstractColumns
_columns::C
function Annotations(_columns::C) where {C}
schema = Tables.schema(_columns)
is_valid_annotations_schema(schema) || throw(ArgumentError("_table does not have appropriate Annotations schema: $schema"))
V = schema === nothing ? Any : schema.types[end]
return new{V,C}(_columns)
end
end
Annotations{V}() where {V} = Annotations(Tables.columntable(_annotation_fields(V)[]))
Tables.istable(annotations::Annotations) = Tables.istable(getfield(annotations, :_columns))
Tables.schema(annotations::Annotations) = Tables.schema(getfield(annotations, :_columns))
Tables.materializer(annotations::Annotations) = Tables.materializer(getfield(annotations, :_columns))
Tables.rowaccess(annotations::Annotations) = Tables.rowaccess(getfield(annotations, :_columns))
Tables.rows(annotations::Annotations{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(annotations, :_columns)))
Tables.columnaccess(annotations::Annotations) = Tables.columnaccess(getfield(annotations, :_columns))
Tables.columns(annotations::Annotations) = Tables.columns(getfield(annotations, :_columns))
Tables.columnnames(annotations::Annotations) = Tables.columnnames(getfield(annotations, :_columns))
Tables.getcolumn(annotations::Annotations, i::Int) = Tables.getcolumn(getfield(annotations, :_columns), i)
Tables.getcolumn(annotations::Annotations, nm::Symbol) = Tables.getcolumn(getfield(annotations, :_columns), nm)
Tables.getcolumn(annotations::Annotations, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(annotations, :_columns), T, i, nm)
#####
##### by_recording
#####
function by_recording(signals::Signals, annotations::Annotations{V}) where {V}
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}()
for signal in Tables.rows(signals)
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()),
recordings, signal.recording_uuid)
recording.signals[signal.type] = signal
end
for annotation in Tables.rows(annotations)
recording = get(recordings, annotation.recording_uuid, nothing)
recording === nothing && continue
recording.annotations[annotation.uuid] = annotation
end
return recordings
end
function by_recording(annotations::Annotations{V}, signals::Signals) where {V}
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()),
recordings, annotation.recording_uuid)
recording.annotations[annotation.uuid] = annotation
end
for signal in Tables.rows(signals)
recording = get(recordings, signal.recording_uuid, nothing)
recording === nothing && continue
recording.signals[signal.type] = signal
end
return recordings
end
function by_recording(signals::Signals)
recordings = Dict{UUID,Dict{String,Signal}}()
for signal in Tables.rows(signals)
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid)
recording[signal.type] = signal
end
return recordings
end
function by_recording(annotations::Annotations{V}) where {V}
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}()
for annotation in Tables.rows(annotations)
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid)
recording[annotation.uuid] = annotation
end
return recordings
end
#####
##### conversion
#####
using MsgPack
using Onda: Onda
function convert_old_dataset(dataset_path, uuid_from_annotation = _ -> uuid4())
raw_header, raw_recordings = MsgPack.unpack(Onda.zstd_decompress(read(joinpath(dataset_path, "recordings.msgpack.zst"))))
v"0.3" <= VersionNumber(raw_header["onda_format_version"]) < v"0.5" || error("unexpected dataset version: $(raw_header["onda_format_version"])")
signals = Signal[]
annotations = Annotation{String}[]
for (uuid, recording) in raw_recordings
recording_uuid = UUID(uuid)
for (type, signal) in recording["signals"]
push!(signals, Signal(; recording_uuid, type,
file_path=Onda.samples_path(dataset_path, recording_uuid, type, signal["file_extension"]),
file_metadata=signal["file_options"],
channel_names=signal["channel_names"],
start_nanosecond=signal["start_nanosecond"],
stop_nanosecond=signal["stop_nanosecond"],
sample_unit=signal["sample_unit"],
sample_resolution_in_unit=signal["sample_resolution_in_unit"],
sample_offset_in_unit=signal["sample_offset_in_unit"],
sample_type=signal["sample_type"],
sample_rate=signal["sample_rate"]))
end
for ann in recording["annotations"]
ann_uuid = uuid_from_annotation(ann)
push!(annotations, Annotation(; recording_uuid, uuid=ann_uuid,
start_nanosecond=ann["start_nanosecond"],
stop_nanosecond=ann["stop_nanosecond"],
value=ann["value"]))
end
end
return Signals(Tables.columntable(signals)), Annotations(Tables.columntable(annotations))
end
@ericphanson
Copy link

I might be missing something obvious, but the methods here https://gist.github.com/jrevels/782acb6b25f71f14a8cee0a3dae85079#file-onda-arrow-flat-multi-table-approach-jl-L27-L32 seem to be referring to the Signal struct with a single _row field defined later in the file, not the Signal struct with a bunch of fields defined right above, right?

@jrevels
Copy link
Author

jrevels commented Jan 4, 2021

I might be missing something obvious, but the methods here https://gist.github.com/jrevels/782acb6b25f71f14a8cee0a3dae85079#file-onda-arrow-flat-multi-table-approach-jl-L27-L32 seem to be referring to the Signal struct with a single _row field defined later in the file, not the Signal struct with a bunch of fields defined right above, right?

Yeah.

This is just a dumping ground/playground for some ideas (mainly for myself to ensure that certain things were reasonable performance-wise)

In general I wouldn't even spend too much time looking at this at this point - I'd much rather get eyes on beacon-biosignals/Onda.jl#59

@ericphanson
Copy link

Ok, sounds good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment