Skip to content

Instantly share code, notes, and snippets.

/dshow.py

Created Aug 31, 2012
Embed
What would you like to do?
directshow capture filter chain (python)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from comtypes import client, GUID
import interfaces as dsifs
from uuids import ids, rids
qedit = client.GetModule("qedit.dll")
quartz = client.GetModule("quartz.dll")
MFVideoARMode_PreservePicture = 0x1
# Helper functions.
def get_name(byte_array):
name = "".join(map(chr, byte_array))
try:
name = name[:name.index('\x00')]
except ValueError:
pass
return name
def get_pin(afilter, pin_name):
pin_enum = afilter.EnumPins()
while True:
cur_pin, fetched = pin_enum.Next(1)
if fetched:
pin_info = cur_pin.QueryPinInfo()
cur_pin_name = get_name(pin_info.achName)
if pin_name in cur_pin_name:
return cur_pin
else:
cur_pin.Release()
break
print("Pin '%s' not found!" % pin_name)
return None
def build_graph():
graph = client.CreateObject(ids.CLSID_FilterGraph,
interface=qedit.IFilterGraph)
# Provided with a graph, create a builder
builder = graph.QueryInterface(qedit.IGraphBuilder)
capture = client.CreateObject(ids.CLSID_CaptureGraphBuilder2,
interface=dsifs.ICaptureGraphBuilder2)
# Filter chain xbar -> tuner -> a/v decoders -> a/v renderers.
##
# Crossbar
# Fetch the first one and set it up to Composite input.
dev_enum_xbar = client.CreateObject(ids.CLSID_SystemDeviceEnum,
interface=dsifs.ICreateDevEnum)
xbar_enum = dev_enum_xbar.CreateClassEnumerator(
GUID(ids.CLSID_VideoCrossbarCategory),
dwFlags = 0)
xbar_enum.Reset()
xbar_moniker, success = xbar_enum.RemoteNext(1)
if not success:
raise IndexError("Couldn't find crossbar device.")
xbar_bind = xbar_moniker.RemoteBindToObject(None, None,
dsifs.IBaseFilter._iid_)
xbar_filter = xbar_bind.QueryInterface(dsifs.IBaseFilter)
# Setup
xbar = xbar_filter.QueryInterface(dsifs.IAMCrossbar)
outputs_count, inputs_count = xbar.get_PinCounts()
for i in range(0, inputs_count):
related_pin_idx, pin_type = xbar.get_CrossbarPinInfo(True, i)
if pin_type == dsifs.PhysConn_Video_Composite:
input_pin_index = i
related_input_pin_idx = related_pin_idx
break
for i in range(0, outputs_count):
related_pin_idx, pin_type = xbar.get_CrossbarPinInfo(False, i)
if pin_type == dsifs.PhysConn_Video_VideoDecoder:
output_pin_index = i
related_output_pin_idx = related_pin_idx
break
# Route video pins
if xbar.CanRoute(output_pin_index, input_pin_index) == 0:
if xbar.Route(output_pin_index, input_pin_index) == 0:
routed_to = xbar.get_IsRoutedTo(output_pin_index)
if routed_to != input_pin_index:
raise Exception("Couldn't route crossbar pins!.")
# Route related (audio) pins
if xbar.CanRoute(related_output_pin_idx, related_input_pin_idx) == 0:
if xbar.Route(related_output_pin_idx, related_input_pin_idx) == 0:
routed_to = xbar.get_IsRoutedTo(related_output_pin_idx)
if routed_to != related_input_pin_idx:
raise Exception("Couldn't route related crossbar pins!.")
graph.AddFilter(xbar_filter, "Crossbar")
##
# Capture (TV Tuner) device
# Fetch the first one and set it up to NTSC.
dev_enum_cap = client.CreateObject(ids.CLSID_SystemDeviceEnum,
interface=dsifs.ICreateDevEnum)
cap_enum = dev_enum_cap.CreateClassEnumerator(
GUID(ids.CLSID_VideoInputDeviceCategory),
dwFlags = 0)
cap_enum.Reset()
cap_moniker, success = cap_enum.RemoteNext(1)
if not success:
raise IndexError("Couldn't find capture device!.")
cap_bind = cap_moniker.RemoteBindToObject(None, None,
dsifs.IBaseFilter._iid_)
cap_filter = cap_bind.QueryInterface(interface=dsifs.IBaseFilter)
cap = cap_filter.QueryInterface(interface=dsifs.IAMAnalogVideoDecoder)
cap.put_TVFormat(dsifs.AnalogVideo_NTSC_M)
graph.AddFilter(cap_filter, "Video Capture")
##
# Connect crossbar to capture.
graph.ConnectDirect(get_pin(xbar_filter, "0: Video Decoder Out"),
get_pin(cap_filter, "Analog Video In"), None)
graph.ConnectDirect(get_pin(xbar_filter, "1: Audio Decoder Out"),
get_pin(cap_filter, "Analog Audio In"), None)
##
# ffdshow filters
# video
ffds_vidraw = client.CreateObject(ids.CLSID_FFDshowRawVideoFilter,
interface=dsifs.IBaseFilter)
graph.AddFilter(ffds_vidraw, "ffdshow raw video filter")
# audio
ffds_auddec = client.CreateObject(ids.CLSID_FFDshowAudioDecoder,
interface=dsifs.IBaseFilter)
graph.AddFilter(ffds_auddec, "ffdshow Audio Decoder")
# link to capture filters.
graph.ConnectDirect(get_pin(cap_filter, "Capture"),
get_pin(ffds_vidraw, "In"), None)
graph.ConnectDirect(get_pin(cap_filter, "Audio Out"),
get_pin(ffds_auddec, "In"), None)
##
# Renderers.
# audio
dsound_renderer = client.CreateObject(ids.CLSID_DSoundRender,
interface=dsifs.IBaseFilter)
graph.AddFilter(dsound_renderer, "Default DirectSound Device")
# video
enhanced_video = client.CreateObject(ids.CLSID_EnhancedVideoRenderer,
interface=dsifs.IBaseFilter)
graph.AddFilter(enhanced_video, "Enhanced Video Renderer")
# link to ffdshow filters.
graph.ConnectDirect(get_pin(ffds_vidraw, "Out"),
get_pin(enhanced_video, "EVR Input0"), None)
graph.ConnectDirect(get_pin(ffds_auddec, "Out"),
get_pin(dsound_renderer, "Audio Input pin (rendered)"), None)
# All done, return the EVR Video Display Control and Graph Media Control
evr_service = enhanced_video.QueryInterface(dsifs.IMFGetService)
vdc_service = evr_service.GetService(GUID(ids.MR_VIDEO_RENDER_SERVICE),
dsifs.IMFVideoDisplayControl._iid_)
evr_videocontrol = vdc_service.QueryInterface(dsifs.IMFVideoDisplayControl)
media_control = graph.QueryInterface(quartz.IMediaControl)
return (media_control, evr_videocontrol)
if __name__ == "__main__":
gr = client.CreateObject(ids.CLSID_FilterGraph,
interface=qedit.IFilterGraph)
print(build_graph(gr))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment