Skip to content

Instantly share code, notes, and snippets.

@comzyh
Last active January 28, 2016 18:41
Show Gist options
  • Save comzyh/4a4fe855392a9a65c552 to your computer and use it in GitHub Desktop.
Save comzyh/4a4fe855392a9a65c552 to your computer and use it in GitHub Desktop.
ProjectOxford-video.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Comzyh
# @Date: 2016-01-29 01:12:22
# @Last Modified by: Comzyh
# @Last Modified time: 2016-01-29 02:29:15
import requests
import json
import time
# from IPython import start_ipython
class VideoAPI(object):
"""docstring for VideoAPI"""
def __init__(self, sub_key):
super(VideoAPI, self).__init__()
# Ocp-Apim-Subscription-Key
self.s = requests.session()
self.sub_key = sub_key
self.api_url = 'https://api.projectoxford.ai/video/v1.0/stabilize'
def send_url_request(self, url):
return self.s.post(url=self.api_url,
headers={
'Ocp-Apim-Subscription-Key': self.sub_key,
'Content-Type': 'application/json'},
data=json.dumps({'Url': url}))
def send_file_request(self, file_path):
video_file = open(file_path, 'rb')
return self.s.post(url=self.api_url,
headers={
'Ocp-Apim-Subscription-Key': self.sub_key,
'Content-Type': 'application/octet-stream'},
data=video_file.read())
def wait_for_succees(self, op_url):
"""
op_url: operation-location
"""
succeed = False
while not succeed:
resp = self.s.get(
op_url, headers={'Ocp-Apim-Subscription-Key': self.sub_key}).json()
if resp['status'] == 'Succeeded':
return resp['resourceLocation']
print('Not finish yet, retry after 20 seconds')
print resp
time.sleep(20)
def api_task(self, url=None, file_path=None):
if url:
resp = self.send_url_request(url=url)
elif file_path:
resp = self.send_file_request(file_path=file_path)
else:
return
if resp.status_code != 202:
print('send_video failed, status_code = %s' % resp.status_code)
print(resp.text)
return
op_url = resp.headers['operation-location']
print('Got op_url : %s' % op_url)
rl_url = self.wait_for_succees(op_url) # resourceLocation
print('Got rl_url : %s' % rl_url)
with open('video_out.mp4', 'wb') as video_out:
video_out.write(self.s.get(rl_url, headers={
'Ocp-Apim-Subscription-Key': self.sub_key}).content)
print('Task Finished')
def main():
import argparse
parser = argparse.ArgumentParser(description="some information here")
parser.add_argument("-u", "--url", type=str,
help="video url e.g. http://x.y/z.avi")
parser.add_argument("-f", "--file", type=str,
help="video path e.g. /home/a.avi")
parser.add_argument("-k", "--key", type=str,
help="Ocp-Apim-Subscription-Key", required=True)
args = parser.parse_args()
api = VideoAPI(sub_key=args.key)
api.api_task(url=args.url, file_path=args.file)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment