Skip to content

Instantly share code, notes, and snippets.

@zhehaowang
Created October 26, 2015 05:05
Show Gist options
  • Save zhehaowang/91887426f38f891109b4 to your computer and use it in GitHub Desktop.
Save zhehaowang/91887426f38f891109b4 to your computer and use it in GitHub Desktop.
Sample edl parser and timed ndn publisher for FoS NP
import re
import json
import time
import getopt
import sys
import logging
import random
from pyndn import Name, Data, Interest, Exclude, KeyLocator
from pyndn.threadsafe_face import ThreadsafeFace
from pyndn.security import KeyChain
from pyndn.security.identity.file_private_key_storage import FilePrivateKeyStorage
from pyndn.security.identity.basic_identity_storage import BasicIdentityStorage
from pyndn.security.identity.identity_manager import IdentityManager
from pyndn.security.policy.config_policy_manager import ConfigPolicyManager
from pyndn.util.common import Common
from pyndn.util import MemoryContentCache, Blob
try:
import asyncio
except ImportError:
import trollius as asyncio
class NaiveEDLParserAndPublisher(object):
def __init__(self):
self.prepareLogging()
self._loop = asyncio.get_event_loop()
self._face = ThreadsafeFace(self._loop)
# Use the system default key chain and certificate name to sign commands.
self._keyChain = KeyChain()
self._keyChain.setFace(self._face)
self._certificateName = self._keyChain.getDefaultCertificateName()
self._face.setCommandSigningInfo(self._keyChain, self._certificateName)
self._memoryContentCache = MemoryContentCache(self._face)
self._events = dict()
self._running = False
return
def parse(self, fileName):
isEventBegin = False
lastEventID = -1
with open(fileName, 'r') as edlFile:
for line in edlFile:
if isEventBegin:
components = line.split()
try:
eventID = int(components[0])
except ValueError:
print("Cannot cast " + components[0] + " to eventID")
continue
# We seem to have a fixed number of components here;
# reference: http://www.edlmax.com/maxguide.html
reelName = components[1]
channel = components[2]
trans = components[3]
timeComponentsIdx = 4
if (len(components) > 8):
frameCount = components[4]
timeComponentsIdx = 5
srcStartTime = components[timeComponentsIdx]
srcEndTime = components[timeComponentsIdx + 1]
dstStartTime = components[timeComponentsIdx + 2]
dstEndTime = components[timeComponentsIdx + 3]
self._events[eventID] = json.loads('{ \
"event_id": "%s", \
"reel_name": "%s", \
"channel": "%s", \
"trans": "%s", \
"src_start_time": "%s", \
"src_end_time": "%s", \
"dst_start_time": "%s", \
"dst_end_time": "%s" \
}' % (str(eventID), reelName, channel, trans, srcStartTime, srcEndTime, dstStartTime, dstEndTime))
isEventBegin = False
lastEventID = eventID
elif (re.match(r'\s', line) is not None or line == ''):
isEventBegin = True
elif lastEventID > 0:
if ('payload' not in self._events[eventID]):
self._events[eventID]['payload'] = [line]
else:
self._events[eventID]['payload'].append(line)
@asyncio.coroutine
def startPublishing(self):
if (len(self._events) == 0):
return
elif (not self._running):
self._memoryContentCache.registerPrefix(Name('/test/edl'), self.onRegisterFailed, self.onDataNotFound)
startTime = time.time()
# TODO: This makes the assumption that dst_start_time are sequential and do not overlap, which does not seem to be our case, example #5 in sample edl.
for item in sorted(self._events):
timeStrs = self._events[item]['dst_start_time'].split(':')
remainingTime = int((timeStrs)[2]) - 3
while (time.time() - startTime < remainingTime):
time.sleep(0.1)
yield None
data = Data(Name("/test/edl/" + str(item)))
data.setContent(json.dumps(self._events[item]))
data.getMetaInfo().setFreshnessPeriod(50000)
self._keyChain.sign(data, self._certificateName)
self._memoryContentCache.add(data)
if __debug__:
print('Added ' + data.getName().toUri())
# TODO: the necessity of adding this line
time.sleep(0.01)
yield None
self._running = True
def onRegisterFailed(self, prefix):
raise RuntimeError("Register failed for prefix", prefix.toUri())
def onDataNotFound(self, prefix, interest, face, interestFilterId, filter):
print('Data not found for interest: ' + interest.getName().toUri())
return
#############################
# Logging
#############################
def prepareLogging(self):
self.log = logging.getLogger(str(self.__class__))
self.log.setLevel(logging.DEBUG)
logFormat = "%(asctime)-15s %(name)-20s %(funcName)-20s (%(levelname)-8s):\n\t%(message)s"
self._console = logging.StreamHandler()
self._console.setFormatter(logging.Formatter(logFormat))
self._console.setLevel(logging.INFO)
# without this, a lot of ThreadsafeFace errors get swallowed up
logging.getLogger("trollius").addHandler(self._console)
self.log.addHandler(self._console)
def setLogLevel(self, level):
"""
Set the log level that will be output to standard error
:param level: A log level constant defined in the logging module (e.g. logging.INFO)
"""
self._console.setLevel(level)
def getLogger(self):
"""
:return: The logger associated with this node
:rtype: logging.Logger
"""
return self.log
if __name__ == '__main__':
naiveEDLParser = NaiveEDLParserAndPublisher()
naiveEDLParser.parse('sequence-0-revised.edl')
naiveEDLParser._loop.run_until_complete(naiveEDLParser.startPublishing())
naiveEDLParser._loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment