Skip to content

Instantly share code, notes, and snippets.

@pukpr
Created October 24, 2025 01:15
Show Gist options
  • Select an option

  • Save pukpr/01e95f5f98818080fac806db98898194 to your computer and use it in GitHub Desktop.

Select an option

Save pukpr/01e95f5f98818080fac806db98898194 to your computer and use it in GitHub Desktop.
Descent optimized LTE annual time-series, Julia
{"PeriodsPhase":[13.599781479999999,7.510001926736354,18.679732370000025,7.195787246118572,20.998850680000004,9.392972953999996,6.001334823,10.345732619999998,9.26752208,6.276528506318169,6.158543069000001,15.797059490000018,5.047887753499271,7.347247053426061,1.7632983608814414,6.514613283064518,7.845674634],"Hold":0.0015847482104409999,"Initial":0.019102685632566375,"Imp_Stride":2,"LTE_Freq":182.73199150000377,"Periods":[27.2122,27.3216,27.5545,13.63339513,13.69114014,13.6061,13.6608,13.71877789,6795.985773,1616.215172,2120.513853,13.77725,3232.430344,9.132931547,9.108450374,9.120674533,27.0926041],"LTE_Amp":1.2282362293246383,"DeltaTime":0.0,"Imp_Amp":38.41771511999972,"AliasedPhase":[12.936492162500015,10.819234671777158,17.28825828537955,9.83624739277385,24.275678314901782,15.228945466500011,8.477116474739738,14.315824278046344,9.824004490952355,6.960279582531864,4.2964023424706665,19.297607654892147,3.967560759570126,22.113385778146025,8.444613035937136,4.885017823159738,10.475305999774953,6.494317291038417,3.0881620393847937],"PeriodsAmp":[0.17714231540222516,0.08189697928004706,0.13238039696161463,0.04706265577458111,-0.14482368970122333,0.08780151439705092,0.11616705001314438,0.20049841467787538,0.03202143578523913,0.0574208142173757,0.18107078047142022,0.09022998496453707,0.02460468099520239,0.010025570705731726,0.06111584446427712,0.008852999851114348,-0.24656794210398358],"Year":365.2520198,"final_state":{"D_prev":0.04294},"Aliased":[0.422362756,0.368617497,0.255621397,0.209019747,0.322015847,0.155274488,0.262765007,0.375761106,0.05374526,0.225992198,0.172246939,0.488757205,0.112996099,0.00714361,0.10034691,0.04660165,0.481613596,2.0,1.0],"AliasedAmp":[-0.15405205352142462,0.06542305446896095,0.04551494783164873,-0.19193662470162862,0.13093727472374558,-0.3613449225821874,0.1299604812328014,0.09260812623852477,-0.5198863964938901,0.2748577231816317,-0.3717139997574524,0.0559454702262442,-0.21525846021194778,0.18071554533781803,-0.28786313953817655,-0.6000879094095063,-0.08536735549574857,-0.07770643619000436,0.14563311110308497],"LTE_Phase":-2.2583129142239473}
#!/usr/bin/env julia
# ts_lte.jl
#
# Julia translation of the provided Python time-series modeling script.
# Features:
# - read two-column CSV (time, value) with CSV.jl / DataFrames.jl
# - read/write JSON sidecar (filename.csv.p)
# - normalize RMS of observed series
# - stateful model_step_algorithm (impulses, sample-and-hold, LTE + aliases)
# - run_loop_time_series to iterate timestamps calling the step function
# - metrics: residuals, MSE, variance of squared errors, Pearson r
# - compute_metrics_region to exclude a middle time interval (for CV)
# - a simple_descent optimizer that scans/discrete-samples Imp_Stride and
# otherwise tries additive steps, switching to relative steps when no
# acceptance occurs (keeps behavior similar to the Python original)
#
# Dependencies (add with `import Pkg; Pkg.add("Package")` if missing):
# CSV, DataFrames, JSON, Statistics, Random, Plots (optional for --plot)
#
# Usage (similar to Python script):
# julia ts_lte.jl data.csv [--out fitted.csv] [--plot] [--low 0.0] [--high 3000.0] [--cc]
#
using CSV
using DataFrames
using JSON
using Statistics
using Random
using LinearAlgebra
using Plots
const TWOPI = 2π
const MONITORED = [
"Imp_Stride",
"DeltaTime",
"Initial",
"Year",
"Imp_Amp",
"PeriodsAmp",
"PeriodsPhase",
"AliasedAmp",
"AliasedPhase",
"Hold",
"LTE_Amp",
"LTE_Freq",
"LTE_Phase",
]
_is_list_param(name::AbstractString) = name in (
"Periods", "PeriodsAmp", "PeriodsPhase",
"Aliased", "AliasedAmp", "AliasedPhase"
)
# ---------------- IO / helpers ----------------
function read_two_column_csv(path::AbstractString)
# CSV.jl auto-detects commas; it will also handle whitespace-separated if tidy.
# We assume the first two columns are time and value.
df = CSV.read(path, DataFrame)
if ncol(df) < 2
error("Input CSV must have at least two columns (time, value).")
end
t = Float64.(df[:, 1])
y = Float64.(df[:, 2])
# remove missing (NaN) entries
mask = .!ismissing.(t) .& .!ismissing.(y)
t = collect(skipmissing(t))[mask] # ensure plain Vector{Float64}
y = collect(skipmissing(y))[mask]
return t, y
end
function read_json_p(path::AbstractString)
if !isfile(path)
return Dict{String,Any}()
end
open(path, "r") do io
return JSON.parse(String(read(io)))
end
end
function write_json_p(path::AbstractString, data::Dict{String,Any})
open(path, "w") do io
JSON.print(io, data) # ; indent=2)
end
end
clone_series(v::AbstractVector{T}) where T = copy(v)
function normalize_rms_y(y::AbstractVector{<:Real})
yv = Float64.(y)
rms = sqrt(mean(vcat(yv...) .^ 2))
if rms == 0.0
return yv
end
return yv ./ rms
end
# ---------------- Model-step implementation ----------------
"""
model_step_algorithm(i, t_i, clone_i, state, params...)
Implements:
- impulse mask every Imp_Stride rows
- C_t = impulse * Imp_Amp * sum_k PeriodsAmp[k] * sin(2π * t * Year / Period_k + PeriodsPhase[k])
- D_t = (1-Hold)*D_prev + Hold*C_t (stateful; initialize state["D_prev"] = Initial)
- E_t = LTE_Amp * sin(D_t * LTE_Freq + LTE_Phase) + alias_sum
"""
function model_step_algorithm(i::Int, t_i::Float64, clone_i::Float64, state::Dict{String,Any},
Initial::Float64,
Year::Float64,
Imp_Stride::Int,
Imp_Amp::Float64,
Periods::Vector{Float64},
PeriodsAmp::Vector{Float64},
PeriodsPhase::Vector{Float64},
Aliased::Vector{Float64},
AliasedAmp::Vector{Float64},
AliasedPhase::Vector{Float64},
Hold::Float64,
LTE_Amp::Float64,
LTE_Freq::Float64,
LTE_Phase::Float64)
# compute sum of sinusoids (Periods)
ssum = 0.0
for (amp, per, ph) in zip(PeriodsAmp, Periods, PeriodsPhase)
perf = float(per)
if perf == 0.0
continue
end
ssum += amp * sin(TWOPI * t_i * Year / perf + ph)
end
# Note: Python used row_idx = i + 2 and checked (row_idx % 12 == Imp_Stride).
# Preserve that behavior here for parity.
row_idx = i + 2
impulse = (mod(row_idx, 12) == Imp_Stride) ? 1.0 : 0.0
C_t = impulse * Imp_Amp * ssum
# stateful D
if !haskey(state, "D_prev")
state["D_prev"] = Initial
end
D_prev = float(state["D_prev"])
D_t = (1.0 - Hold) * D_prev + Hold * C_t
state["D_prev"] = D_t
# alias sum
ssum2 = 0.0
for (amp, freq, ph) in zip(AliasedAmp, Aliased, AliasedPhase)
ssum2 += amp * sin(TWOPI * t_i * freq + ph)
end
E_t = LTE_Amp * sin(D_t * LTE_Freq + LTE_Phase) + ssum2
return float(E_t), state
end
# ---------------- Runner ----------------
function run_loop_time_series(time::AbstractVector{<:Real},
observed::AbstractVector{<:Real},
model_step_fn::Function,
params::Dict{String,Any})
# extract/normalize params with defaults
DeltaTime = get(params, "DeltaTime", 0.0)
Initial = get(params, "Initial", 0.04294)
Year = get(params, "Year", 365.242)
Imp_Stride = Int(get(params, "Imp_Stride", 12))
Imp_Amp = get(params, "Imp_Amp", 1.0)
Periods = Vector{Float64}(get(params, "Periods", [1.0]))
PeriodsAmp = Vector{Float64}(get(params, "PeriodsAmp", fill(1.0, length(Periods))))
PeriodsPhase = Vector{Float64}(get(params, "PeriodsPhase", fill(0.0, length(Periods))))
Aliased = Vector{Float64}(get(params, "Aliased", [1.0]))
AliasedAmp = Vector{Float64}(get(params, "AliasedAmp", fill(1.0, length(Aliased))))
AliasedPhase = Vector{Float64}(get(params, "AliasedPhase", fill(0.0, length(Aliased))))
Hold = get(params, "Hold", 0.5)
LTE_Amp = get(params, "LTE_Amp", 1.0)
LTE_Freq = get(params, "LTE_Freq", 1.0)
LTE_Phase = get(params, "LTE_Phase", 0.0)
N = length(time)
clone = clone_series(observed)
state = Dict{String,Any}()
model = zeros(Float64, N)
for i in 1:N
# Python used t_i = time[i] + DeltaTime * 1000.0
t_i = float(time[i]) + float(DeltaTime) * 1000.0
clone_i = float(clone[i])
v, state = model_step_fn(i-1, t_i, clone_i, state,
float(Initial), float(Year), Int(Imp_Stride), float(Imp_Amp),
Periods, PeriodsAmp, PeriodsPhase,
Aliased, AliasedAmp, AliasedPhase,
float(Hold), float(LTE_Amp), float(LTE_Freq), float(LTE_Phase))
model[i] = v
end
return model, state
end
# ---------------- Metrics ----------------
function compute_metrics(observed::AbstractVector{<:Real}, model::AbstractVector{<:Real})
residuals = model .- observed
mse = mean(residuals .^ 2)
var_sq_err = var(residuals .^ 2)
# Pearson r
pearson_r = try
cor(observed, model)
catch
NaN
end
pearson_p = NaN # p-value not computed here
return Dict(
"residuals" => residuals,
"mse" => float(mse),
"var_squared_error" => float(var_sq_err),
"pearson_r" => float(pearson_r),
"pearson_p" => pearson_p
)
end
function compute_metrics_scalar(observed::AbstractVector{<:Real}, model::AbstractVector{<:Real})
residuals = model .- observed
mse = mean(residuals .^ 2)
return float(mse)
end
function compute_metrics_region(time::AbstractVector{<:Real},
observed::AbstractVector{<:Real},
model::AbstractVector{<:Real},
Time_Low::Union{Nothing,Float64}=nothing,
Time_High::Union{Nothing,Float64}=nothing;
use_pearson::Bool=false)
# build include mask (true => include in metric)
t = Float64.(time)
if Time_Low === nothing && Time_High === nothing
mask = trues(length(t))
elseif Time_Low === nothing
mask = t .> float(Time_High)
elseif Time_High === nothing
mask = t .< float(Time_Low)
else
tl = float(Time_Low); th = float(Time_High)
if tl > th
tl, th = th, tl
end
mask = (t .< tl) .| (t .> th)
end
obs_sel = observed[mask]
mod_sel = model[mask]
if length(obs_sel) == 0
return NaN
end
if use_pearson
r = try
cor(obs_sel, mod_sel)
catch
NaN
end
return 1.0 - float(r)
else
return float(mean((mod_sel .- obs_sel) .^ 2))
end
end
# ---------------- Simple descent optimizer (abs -> rel switching) ----------------
function simple_descent(time, observed, model_step_fn::Function, params::Dict{String,Any};
metric_fn::Function=compute_metrics_region,
step::Float64=0.01,
max_iters::Int=100,
tol::Float64=1e-12,
verbose::Bool=false,
Time_Low::Float64=1920.0,
Time_High::Float64=1950.0)
best_params = deepcopy(params)
# ensure list params are vectors of Float64
for name in MONITORED
if _is_list_param(name) && haskey(best_params, name)
best_params[name] = Float64.(best_params[name])
end
end
model_vals, final_state = run_loop_time_series(time, observed, model_step_fn, best_params)
best_metric = float(metric_fn(time, observed, model_vals, Time_Low, Time_High))
history = Vector{Dict{String,Any}}()
if verbose
@info "[simple_descent] start metric=$(best_metric)"
end
switched_to_rel = false
for iteration in 1:max_iters
any_accepted = false
mode = switched_to_rel ? "rel" : "abs"
if verbose
@info "[simple_descent] iteration $iteration mode=$mode best_metric=$(best_metric)"
end
for name in MONITORED
if name == "Imp_Stride"
max_val = 12
trials_per_iter = 4
population = collect(0:max_val)
k = min(trials_per_iter, length(population))
idxs = randperm(length(population))[1:k]
sampled = population[idxs]
for cand in sampled
if Int(get(best_params, name, 0)) == cand
continue
end
candidate_params = deepcopy(best_params)
candidate_params[name] = cand
mvals_cand, _ = run_loop_time_series(time, observed, model_step_fn, candidate_params)
metric_cand = float(metric_fn(time, observed, mvals_cand, Time_Low, Time_High))
accepted = metric_cand + tol < best_metric
push!(history, Dict("param"=>name, "index"=>nothing, "candidate"=>cand,
"metric"=>metric_cand, "accepted"=>accepted, "method"=>"discrete_random"))
if accepted
best_params = candidate_params
best_metric = metric_cand
model_vals = mvals_cand
any_accepted = true
if verbose
@info "[stride] accepted $name = $cand -> metric=$(best_metric)"
end
end
end
continue
end
if _is_list_param(name)
if !haskey(best_params, name)
continue
end
lst = Float64.(best_params[name])
for idx in 1:length(lst)
cur = lst[idx]
for sign in (1.0, -1.0)
if !switched_to_rel
candidate = cur + sign * step
else
if abs(cur) > 0.0
candidate = cur * (1.0 + sign * (step/5.0))
else
candidate = cur + sign * step
end
end
candidate_params = deepcopy(best_params)
candidate_params[name] = copy(lst)
candidate_params[name][idx] = candidate
mvals_cand, _ = run_loop_time_series(time, observed, model_step_fn, candidate_params)
metric_cand = float(metric_fn(time, observed, mvals_cand, Time_Low, Time_High))
accepted = metric_cand + tol < best_metric
push!(history, Dict("param"=>name, "index"=>idx, "candidate"=>candidate,
"metric"=>metric_cand, "accepted"=>accepted))
if accepted
best_params = candidate_params
best_metric = metric_cand
model_vals = mvals_cand
any_accepted = true
if verbose
@info "[simple_descent] accepted $name[$idx] = $(candidate) -> metric=$(best_metric)"
end
break
end
end
end
else
if !haskey(best_params, name)
continue
end
cur = float(best_params[name])
for sign in (1.0, -1.0)
if !switched_to_rel
candidate = cur + sign * step
else
if abs(cur) > 0.0
candidate = cur * (1.0 + sign * (step/5.0))
else
candidate = cur + sign * step
end
end
candidate_params = deepcopy(best_params)
candidate_params[name] = candidate
mvals_cand, _ = run_loop_time_series(time, observed, model_step_fn, candidate_params)
metric_cand = float(metric_fn(time, observed, mvals_cand, Time_Low, Time_High))
push!(history, Dict("param"=>name, "index"=>nothing, "candidate"=>candidate,
"metric"=>metric_cand, "accepted"=>(metric_cand + tol < best_metric)))
if metric_cand + tol < best_metric
best_params = candidate_params
best_metric = metric_cand
model_vals = mvals_cand
any_accepted = true
if verbose
@info "[simple_descent] accepted $name = $(candidate) -> metric=$(best_metric)"
end
break
end
end
end
end
if !any_accepted
if !switched_to_rel
switched_to_rel = true
if verbose
@info "[simple_descent] no acceptance with absolute steps; switching to relative steps"
end
continue
else
if verbose
@info "[simple_descent] no acceptance in iteration $iteration; stopping early"
end
return Dict(
"best_params" => best_params,
"best_metric" => best_metric,
"best_model" => model_vals,
"best_state" => final_state,
"history" => history,
"iterations" => iteration
)
end
end
end
if verbose
@info "[simple_descent] reached max_iters=$max_iters, best_metric=$(best_metric)"
end
return Dict(
"best_params" => best_params,
"best_metric" => best_metric,
"best_model" => model_vals,
"best_state" => final_state,
"history" => history,
"iterations" => max_iters
)
end
# ---------------- CLI / main ----------------
function parse_args()
# very small argument parser: positional csv, optional flags --out, --plot, --low, --high, --cc
args = Dict{String,Any}()
ARGS_copy = copy(ARGS)
if length(ARGS_copy) == 0
println("Usage: julia ts_lte.jl data.csv [--out fitted.csv] [--plot] [--low 0.0] [--high 3000.0] [--cc]")
exit(1)
end
args["csv"] = ARGS_copy[1]
i = 2
args["out"] = "fitted_out.csv"
args["plot"] = false
args["low"] = 0.0
args["high"] = 3000.0
args["cc"] = false
while i <= length(ARGS_copy)
a = ARGS_copy[i]
if a == "--out" && i+1 <= length(ARGS_copy)
args["out"] = ARGS_copy[i+1]; i += 2; continue
elseif a == "--plot"
args["plot"] = true; i += 1; continue
elseif a == "--low" && i+1 <= length(ARGS_copy)
args["low"] = parse(Float64, ARGS_copy[i+1]); i += 2; continue
elseif a == "--high" && i+1 <= length(ARGS_copy)
args["high"] = parse(Float64, ARGS_copy[i+1]); i += 2; continue
elseif a == "--cc"
args["cc"] = true; i += 1; continue
else
i += 1
end
end
return args
end
function main()
args = parse_args()
csvpath = args["csv"]
outpath = args["out"]
do_plot = args["plot"]
Low = args["low"]
High = args["high"]
use_cc = args["cc"]
time, y = read_two_column_csv(csvpath)
if length(time) == 0
error("No data read from CSV.")
end
y = normalize_rms_y(y)
json_path = csvpath * ".p"
data_dict = read_json_p(json_path)
# ensure Dict{String,Any}
params = Dict{String,Any}(data_dict)
model_fn = model_step_algorithm
result = simple_descent(time, y, model_fn, params;
metric_fn=compute_metrics_region,
step=0.05, max_iters=400, tol=1e-12, verbose=true,
Time_Low=Low, Time_High=High)
if haskey(result, "best_params")
params = result["best_params"]
end
model_vals, final_state = run_loop_time_series(time, y, model_fn, params)
metrics = compute_metrics(y, model_vals)
# build output DataFrame and write CSV
df_out = DataFrame(time = time, observed = y, model = model_vals)
CSV.write(outpath, df_out)
# save updated params to JSON sidecar
write_json_p(json_path, params)
# print summary
println("Processed $(length(time)) samples from: $csvpath")
println("Output CSV: $outpath")
println("Updated JSON data file: $json_path")
println("Metrics:")
println(" MSE = $(metrics["mse"])")
println(" Variance of squared errors = $(metrics["var_squared_error"])")
println(" Pearson r = $(metrics["pearson_r"]) (p-value not computed)")
mask = (time .> Low) .& (time .< High)
p = plot()
plot!(p, time, y, label="observed", lw=1)
plot!(p, time, model_vals, label="model", lw=1, color=:red)
if any(mask)
plot!(p, time[mask], zeros(sum(mask)), label="cross-validation", lw=3, linestyle=:dash, color=:black)
end
xlabel!("Year")
ylabel!("val")
title!(p, "Model: $(basename(csvpath)) Pearson r=$(round(metrics["pearson_r"], sigdigits=4)) MSE=$(round(metrics["mse"], sigdigits=4))")
if do_plot
gui(p)
println("enter to close")
readline()
else
filename = string(args["csv"], "-", args["low"], "-", args["high"], ".png")
savefig(p, filename)
end
end
if abspath(PROGRAM_FILE) == @__FILE__
main()
end
@pukpr
Copy link
Author

pukpr commented Oct 24, 2025

julia ts_lte.jl 144d.dat --low 1930 --high 1960

eog 144d.dat-1930.0-1960.0.png

144d dat-1930 0-1960 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment