|
import bodyParser from 'body-parser'; |
|
import express from 'express' |
|
import * as _ from 'lodash'; |
|
import * as IPFS from 'ipfs-client'; |
|
import Queue from 'promise-queue'; |
|
import xmlp from 'fast-xml-parser'; |
|
import { readFileSync } from 'fs'; |
|
|
|
const app = express() |
|
const port = 3000 |
|
const ttl = process.env.IPFS_TTL || "30s"; |
|
|
|
//need to refactor for /stream/:key/:prefix route. |
|
//still needed for old default routing. |
|
const ipfsPrefix = process.env.IPFS_PREFIX || "dashtest"; |
|
const ipnsKey = process.env.IPFS_KEY || "livestream"; |
|
const timeOffset = process.env.OFFSET || (45 * 1000); |
|
|
|
const ipfs = IPFS.create({ |
|
http: process.env.IPFS_API || '/ip4/127.0.0.1/tcp/5001' |
|
}); |
|
|
|
//for Windows, concurrent needs to be 6 instead of 10 |
|
//as that's all it seems to be able to handle. |
|
const queue = new Queue(10, Infinity); |
|
|
|
const m3u8Hits = {}; |
|
const chunkReps = {}; |
|
const playerDeployed = {}; |
|
const xmlc = { attributeNamePrefix: "$$", ignoreAttributes: false }; |
|
const XMLP = new xmlp.XMLParser(xmlc); |
|
const XMLB = new xmlp.XMLBuilder(xmlc); |
|
|
|
console.log("prefix", ipfsPrefix); |
|
|
|
function handleIncoming(req, res, next) { |
|
//ignore anything not put or post. |
|
if (!/(put|post)/i.test(req.method)) return next(); |
|
|
|
if(/\.mpd/i.test(req.path) && !!!playerDeployed[req.originalUrl]) { |
|
ipfs.files.write(`/${req.params?.ipfsPrefix || ipfsPrefix}/player.html`, readFileSync('./test-mpd.html'), { truncate: true, create: true, parents: true }); |
|
} |
|
|
|
//don't need to publish the mpd all the time. |
|
if (/\.(m3u8|mpd)/.test(req.path)) { |
|
m3u8Hits[req.path] || (m3u8Hits[req.path] = 0); |
|
m3u8Hits[req.path]++; |
|
if (m3u8Hits[req.path] % 5 == 0) return res.send({ skipped: 1 }); |
|
} |
|
|
|
//console.log("ACCEPTING", req.method, req.path); |
|
|
|
let [, repId] = req.path.match(/chunk-stream_([0-9]+)-/i) || [, 'other']; |
|
chunkReps[repId] || (chunkReps[repId] = [repId, 0]); |
|
chunkReps[repId][1]++; |
|
queue.add(() => new Promise((resol) => { |
|
console.log("PROCESSING", req.method, req.path); |
|
let text = req.body; |
|
//lie so we have enough time to upload and let IPNS do it's thing. |
|
if (/\.mpd/i.test(req.path)) { |
|
try { |
|
let pdoc = XMLP.parse(req.body.toString('utf8')); |
|
|
|
let newtimeAT = (new Date(Date.parse(pdoc.MPD.$$availabilityStartTime) + timeOffset)).toISOString(); |
|
console.log("TIME", pdoc.MPD.$$availabilityStartTime, newtimeAT) |
|
pdoc.MPD.$$availabilityStartTime = newtimeAT; |
|
|
|
let newtimePT = (new Date(Date.parse(pdoc.MPD.$$publishTime) + timeOffset)).toISOString(); |
|
pdoc.MPD.$$publishTime = newtimePT; |
|
|
|
let bdoc = XMLB.build(pdoc); |
|
|
|
//we need a better way! ...? |
|
let tmptext = bdoc.toString('utf8'); |
|
[1, 2, 3, 4].forEach(() => { |
|
tmptext = tmptext.replaceAll(/ ([a-z]+)(\/>|>| )/gi, " $1=\"true\"$2"); |
|
}); |
|
text = Buffer.from(tmptext, 'utf8'); |
|
|
|
} catch (ex) { |
|
console.error("!!!!!", ex); |
|
} |
|
|
|
} |
|
|
|
//write/overwrite the mpd files. |
|
ipfs.files.write(`/${req.params?.ipfsPrefix || ipfsPrefix}${req.path}`, text, { offset: 0, length: text.length, truncate: true, create: true, parents: true }).then(() => { |
|
console.log("COMPLETED", req.method, req.path, queue.getQueueLength(), queue.getPendingLength()); |
|
res.send({ done: 1, length: req.body.length }); |
|
|
|
chunkReps[repId][1]--; |
|
//update the ipns pointer. |
|
//probably unnecessary to be done every time now that we lie about the start. |
|
return ipfs.files.stat(`/${req.params?.ipfsPrefix || ipfsPrefix}`, { hash: 1 }).then(o => ipfs.name.publish(o.cid, { key: req.params?.ipnsKey || ipnsKey, ttl, lifetime: ttl }), console.error.bind(null, "fstat111")).catch(console.error.bind(null, "pub111")); |
|
}, (err) => res.send({ done: 2 }) && console.error(req.method, req.path, err)).finally(resol); |
|
})) |
|
} |
|
|
|
//untested |
|
app.use("/stream/:ipnsKey/:ipfsPrefix", function (req, res) { |
|
if (!/(put|post)/i.test(req.method)) |
|
return res.send({ hi: "world", path: req.path, key: req.params?.IPFS_KEY, prefix: req.params?.ipfsPrefix, url: req.url, ourl: req.originalUrl}); |
|
next(); |
|
}, bodyParser.raw({ limit: '50mb', type: () => true }), handleIncoming); |
|
|
|
app.use(bodyParser.raw({ limit: '50mb', type: () => true }), handleIncoming); |
|
|
|
app.listen(port, () => { |
|
console.log(`Example app listening on port ${port}`) |
|
}) |