etat-du-trafic-en-temps-reel dataset ingestion for Warp 10
const axios = require('axios'); | |
const dayjs = require('dayjs'); | |
const cron = require('node-cron'); | |
const token = 'WRITE TOKEN'; | |
const read_token = 'READ TOKEN'; | |
const warp10 = 'http://localhost:8080/api/v0'; | |
let nhits = Number.MAX_VALUE; | |
const pagination = 200; | |
function getRecords(currentRecord) { | |
console.log('currentRecord', Math.min(currentRecord, nhits), '/', nhits); | |
axios.get(`https://data.rennesmetropole.fr/api/records/1.0/search/?dataset=etat-du-trafic-en-temps-reel&rows=${pagination}&start=${currentRecord}`) | |
.then(response => { | |
nhits = response.data.nhits; | |
let updt = '' | |
let meta = {} | |
response.data.records.forEach(r => { | |
const date = dayjs(r.fields.datetime).valueOf() * 1000; | |
const loc = r.fields.geo_point_2d; | |
let mv = []; | |
if (r.fields.geo_shape) { | |
r.fields.geo_shape.coordinates[0].forEach((p, i) => { | |
mv.push([p[1], p[0]]); | |
}); | |
meta[r.fields.predefinedlocationreference] = { | |
denomination: encodeURIComponent(r.fields.denomination), | |
geo_shape: encodeURIComponent(r.fields.geo_shape ? JSON.stringify(mv) : '[]') | |
}; | |
} | |
if (loc) { | |
updt += `${date}/${loc[0]}:${loc[1]}/ fr.rennesmetropole.trafic{predefinedlocationreference=${r.fields.predefinedlocationreference}} [ "${r.fields.trafficstatus}" ${r.fields.averagevehiclespeed} ${r.fields.traveltime} ] | |
`; | |
} | |
}); | |
axios.post(warp10 + '/update', updt, { | |
headers: { | |
'X-Warp10-Token': token, | |
'Content-Type': 'text/plain' | |
} | |
}).then(() => { | |
const metaExec = ` | |
'${JSON.stringify(meta)}' | |
JSON-> 'meta' STORE | |
[ '${read_token}' 'fr.rennesmetropole.trafic' { } NOW -1 ] FETCH | |
<% | |
'gts' STORE | |
$gts LABELS 'predefinedlocationreference' GET 'id' STORE | |
$meta $id GET 'newAttr' STORE | |
<% $newAttr ISNULL NOT %> | |
<% | |
$gts ATTRIBUTES $newAttr APPEND 'attr' STORE | |
$gts $attr SETATTRIBUTES '${token}' META | |
%> IFT | |
%> FOREACH`; | |
axios.post(warp10 + '/exec', metaExec, { | |
headers: { | |
'X-Warp10-Token': token, | |
'Content-Type': 'text/plain' | |
} | |
}).then(() => { | |
if (currentRecord < nhits) { | |
getRecords(currentRecord + pagination); | |
} | |
}).catch((error) => { | |
console.error(error); | |
}); | |
}).catch(error => { | |
console.error(error); | |
}); | |
}).catch(error => { | |
console.error(error); | |
}); | |
} | |
cron.schedule('*/2 * * * *', () => { | |
getRecords(0); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment