Skip to content

Instantly share code, notes, and snippets.

@cjcolvar
Created March 11, 2013 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjcolvar/5135903 to your computer and use it in GitHub Desktop.
Save cjcolvar/5135903 to your computer and use it in GitHub Desktop.
src/main/java/org/opencastproject/distribution/hls/HLSDistributionServiceImpl.java with debug logging on retract
/**
* Copyright 2009, 2010 The Regents of the University of California
* Licensed under the Educational Community License, Version 2.0
* (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.osedu.org/licenses/ECL-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
package org.opencastproject.distribution.hls;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpHead;
import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DistributionService;
//import org.opencastproject.distribution.api.DownloadDistributionService;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.composer.api.EncodingProfile;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.track.TrackImpl;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.TrustedHttpClient;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.FileSupport;
import org.opencastproject.util.MimeType;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.PathSupport;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.workspace.api.Workspace;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;;
/**
* Distributes media to the local media delivery directory.
*/
public class HLSDistributionServiceImpl extends AbstractJobProducer implements DistributionService/*, DownloadDistributionService*/ {
/** Logging facility */
private static final Logger logger = LoggerFactory.getLogger(HLSDistributionServiceImpl.class);
/** List of available operations on jobs */
private enum Operation {
Distribute, Retract
}
/** Receipt type */
public static final String JOB_TYPE = "org.opencastproject.distribution.hls";
/** Default distribution directory */
public static final String DEFAULT_DISTRIBUTION_DIR = "opencast" + File.separator + "static";
/** Timeout in millis for checking distributed file request */
private static final long TIMEOUT = 10000L;
/** Interval time in millis for checking distributed file request */
private static final long INTERVAL = 300L;
/** Path to the distribution directory */
protected File distributionDirectory = null;
/** this media hls service's base URL */
protected String serviceUrl = null;
/** The remote service registry */
protected ServiceRegistry serviceRegistry = null;
/** The workspace reference */
protected Workspace workspace = null;
/** The security service */
protected SecurityService securityService = null;
/** The user directory service */
protected UserDirectoryService userDirectoryService = null;
/** The organization directory service */
protected OrganizationDirectoryService organizationDirectoryService = null;
/** The trusted HTTP client */
private TrustedHttpClient trustedHttpClient;
/** the encoding engine */
private FFmpegHLSEncoderEngine engine;
/**
* Creates a new instance of the hls distribution service.
*/
public HLSDistributionServiceImpl() {
super(JOB_TYPE);
}
/**
* Activate method for this OSGi service implementation.
*
* @param cc
* the OSGi component context
*/
protected void activate(ComponentContext cc) {
engine = new FFmpegHLSEncoderEngine();
engine.activate(cc);
serviceUrl = cc.getBundleContext().getProperty("org.opencastproject.hls.url");
if (serviceUrl == null)
throw new IllegalStateException("HLS url must be set (org.opencastproject.hls.url)");
String ccDistributionDirectory = cc.getBundleContext().getProperty("org.opencastproject.hls.directory");
if (ccDistributionDirectory == null)
throw new IllegalStateException("Distribution directory must be set (org.opencastproject.hls.directory)");
this.distributionDirectory = new File(ccDistributionDirectory);
logger.info("HLS distribution directory is {}", distributionDirectory);
}
@Override
public Job distribute(MediaPackage mediapackage, String elementId) throws DistributionException, MediaPackageException {
return distribute(mediapackage, elementId, true);
}
public Job distribute(MediaPackage mediapackage, String elementId, boolean checkAvailability)
throws DistributionException, MediaPackageException {
if (mediapackage == null)
throw new MediaPackageException("Mediapackage must be specified");
if (elementId == null)
throw new MediaPackageException("Element ID must be specified");
try {
return serviceRegistry.createJob(JOB_TYPE,
Operation.Distribute.toString(),
Arrays.asList(MediaPackageParser.getAsXml(mediapackage),
elementId,
Boolean.toString(checkAvailability)));
} catch (ServiceRegistryException e) {
throw new DistributionException("Unable to create a job", e);
}
}
/**
* Distributes the mediapackage's element to the location that is returned by the concrete implementation. In
* addition, a representation of the distributed element is added to the mediapackage.
*
* @see org.opencastproject.distribution.api.DistributionService#distribute(org.opencastproject.mediapackage.MediaPackage,
* String)
* @throws org.opencastproject.distribution.api.DistributionException
* in case of an error
*/
protected MediaPackageElement distribute(Job job, MediaPackage mediapackage, String elementId, boolean checkAvailability)
throws DistributionException {
return distributeElement(mediapackage, elementId, checkAvailability);
}
/**
* Distribute a Mediapackage element to the hls distribution service.
*
* @param mediapackage
* The media package that contains the element to distribute.
* @param elementId
* The id of the element that should be distributed contained within the media package.
* @param checkAvailability
* Check the availability of the distributed element via http.
* @return A reference to the MediaPackageElement that has been distributed.
* @throws DistributionException
* Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
* cannot be copied or another unexpected exception occurs.
*/
public MediaPackageElement distributeElement(MediaPackage mediapackage, String elementId, boolean checkAvailability)
throws DistributionException {
if (mediapackage == null)
throw new IllegalArgumentException("Mediapackage must be specified");
if (elementId == null)
throw new IllegalArgumentException("Element ID must be specified");
String mediaPackageId = mediapackage.getIdentifier().compact();
MediaPackageElement element = mediapackage.getElementById(elementId);
// Make sure the element exists
if (mediapackage.getElementById(elementId) == null)
throw new IllegalStateException("No element " + elementId + " found in mediapackage");
if (!accepts(element)) {
logger.debug("Skipping {} {} for distribution to the streaming server", element.getElementType().toString()
.toLowerCase(), element.getIdentifier());
return null;
}
try {
File source;
try {
source = workspace.get(element.getURI());
} catch (NotFoundException e) {
throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
} catch (IOException e) {
throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
}
File destination = getDistributionFile(mediapackage, element);
// Make the directories to place the files into
try {
FileUtils.forceMkdir(destination.getParentFile());
} catch (IOException e) {
throw new DistributionException("Unable to create " + destination.getParentFile(), e);
}
logger.debug("Distributing {} to {}", elementId, destination);
// Do the HLS segmentation and m3u8 playlist generation
final Map<String, String> properties = new HashMap<String, String>();
properties.put("profile.hls.http.name", "hls");
properties.put("profile.hls.http.input", "visual");
properties.put("profile.hls.http.output", "visual");
properties.put("profile.hls.http.suffix", ".m3u8");
properties.put("profile.hls.http.mimetype", "application/x-mpegURL");
properties.put("profile.hls.http.ffmpeg.command", "-i #{in.video.path} -codec copy -map 0 -bsf h264_mp4toannexb -f segment -segment_list #{outputdir}/#{outputname}#{out.suffix} -segment_time 10 #{outputdir}/#{outputname}-%03d.ts");
EncodingProfile profile = createEncodingProfile("profile.hls.http", ".m3u8", properties);
final Map<String, String> commandLineOpts = new HashMap<String, String>();
commandLineOpts.put("outputdir", destination.getAbsoluteFile().getParent());
commandLineOpts.put("outputname", FilenameUtils.getBaseName(destination.getName()));
try {
engine = engine == null ? new FFmpegHLSEncoderEngine() : engine;
File playlistFile = engine.encode(source, profile, commandLineOpts).getOrElseNull();
} catch (Exception e) {
throw new DistributionException("Unable to generare HLS segments and playlists for " + source + " in " + destination.getAbsoluteFile().getParent(), e);
}
// Create a representation of the distributed file in the mediapackage
MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
try {
distributedElement.setURI(getDistributionUri(mediaPackageId, element));
} catch (URISyntaxException e) {
throw new DistributionException("Distributed element produces an invalid URI", e);
}
distributedElement.setMimeType(MimeType.mimeType("application", "x-mpegURL"));
distributedElement.setIdentifier(null);
logger.info("Finished distribution of {}", element);
URI uri = distributedElement.getURI();
long now = 0L;
while (checkAvailability) {
HttpResponse response = trustedHttpClient.execute(new HttpHead(uri));
if (response.getStatusLine().getStatusCode() == HttpServletResponse.SC_OK)
break;
if (now < TIMEOUT) {
try {
Thread.sleep(INTERVAL);
now += INTERVAL;
continue;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
logger.warn("Status code of distributed file {}: {}", uri, response.getStatusLine().getStatusCode());
throw new DistributionException("Unable to load distributed file " + uri.toString());
}
return distributedElement;
} catch (Exception e) {
logger.warn("Error distributing " + element, e);
if (e instanceof DistributionException) {
throw (DistributionException) e;
} else {
throw new DistributionException(e);
}
}
}
/**
* Creates EncodingProfile.
*
* @param name
* @param suffix
* @param properties
* @return
*/
private EncodingProfile createEncodingProfile(final String name, final String suffix,
final Map<String, String> properties) {
EncodingProfile profile = new EncodingProfile() {
@Override
public boolean isApplicableTo(MediaType type) {
return false;
}
@Override
public boolean hasExtensions() {
return false;
}
@Override
public String getSuffix() {
return suffix;
}
@Override
public Object getSource() {
return null;
}
@Override
public MediaType getOutputType() {
return null;
}
@Override
public String getName() {
return name;
}
@Override
public String getMimeType() {
return null;
}
@Override
public String getIdentifier() {
return name;
}
@Override
public Map<String, String> getExtensions() {
return properties;
}
@Override
public String getExtension(String key) {
return properties.get(getIdentifier() + "." + key);
}
@Override
public MediaType getApplicableMediaType() {
return null;
}
};
return profile;
}
@Override
public Job retract(MediaPackage mediaPackage, String elementId) throws DistributionException {
if (mediaPackage == null)
throw new IllegalArgumentException("Mediapackage must be specified");
if (elementId == null)
throw new IllegalArgumentException("Element ID must be specified");
try {
List<String> arguments = new ArrayList<String>();
arguments.add(MediaPackageParser.getAsXml(mediaPackage));
arguments.add(elementId);
return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(), arguments);
} catch (ServiceRegistryException e) {
throw new DistributionException("Unable to create a job", e);
}
}
/**
* Retract a media package element from the distribution channel. The retracted element must not necessarily be the
* one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated and then
* in turn be matched against each element of the package. This way you are able to retract elements by providing the
* "original" element here.
*
* @param job
* the associated job
* @param mediapackage
* the mediapackage
* @param elementId
* the element identifier
* @return the retracted element or <code>null</code> if the element was not retracted
* @throws org.opencastproject.distribution.api.DistributionException
* in case of an error
*/
protected MediaPackageElement retract(Job job, MediaPackage mediapackage, String elementId)
throws DistributionException {
logger.debug("Running retract on mediapackage " + mediapackage.toString() + " with element " + elementId);
if (mediapackage == null) {
logger.debug("Mediapackage must be specified");
throw new IllegalArgumentException("Mediapackage must be specified");
}
if (elementId == null) {
logger.debug("Element ID must be specified");
throw new IllegalArgumentException("Element ID must be specified");
}
// Make sure the element exists
MediaPackageElement element = mediapackage.getElementById(elementId);
if (element == null) {
logger.debug("No element " + elementId + " found in mediapackage");
throw new IllegalStateException("No element " + elementId + " found in mediapackage");
}
// Find the element that has been created as part of the distribution process
String mediaPackageId = mediapackage.getIdentifier().compact();
URI distributedURI = null;
MediaPackageElement distributedElement = null;
try {
distributedURI = getDistributionUri(mediaPackageId, element);
logger.debug("Attempting to find element to retract: " + element.toString() + " at " + distributedURI);
for (MediaPackageElement e : mediapackage.getElements()) {
if (distributedURI.equals(e.getURI())) {
distributedElement = e;
break;
}
}
} catch (URISyntaxException e) {
throw new DistributionException("Retracted element produces an invalid URI", e);
}
logger.debug("Found element " + distributedElement.toString());
// Has this element been distributed?
if (distributedElement == null) {
logger.debug("Could not find element {}", element);
return null;
}
try {
File mediapackageDir = getMediaPackageDirectory(mediaPackageId);
File elementDir = getDistributionFile(mediapackage, element);
logger.info("Retracting element {} from {}", distributedElement, elementDir);
// Does the file exist? If not, the current element has not been distributed to this channel
// or has been removed otherwise
if (!elementDir.exists()) {
logger.warn("Unable to delete element from {}", elementDir);
return distributedElement;
}
// Try to remove the file and - if possible - the parent folder
FileUtils.forceDelete(elementDir.getParentFile());
if (mediapackageDir.list().length == 0) {
FileSupport.delete(mediapackageDir);
}
logger.info("Finished rectracting element {} of media package {}", elementId, mediaPackageId);
return distributedElement;
} catch (Exception e) {
logger.warn("Error retracting element " + elementId + " of mediapackage " + mediaPackageId, e);
if (e instanceof DistributionException) {
throw (DistributionException) e;
} else {
throw new DistributionException(e);
}
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
*/
@Override
protected String process(Job job) throws Exception {
Operation op = null;
String operation = job.getOperation();
List<String> arguments = job.getArguments();
try {
op = Operation.valueOf(operation);
MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(0));
String elementId = arguments.get(1);
switch (op) {
case Distribute:
Boolean checkAvailability = Boolean.parseBoolean(arguments.get(2));
MediaPackageElement distributedElement = distribute(job, mediapackage, elementId, checkAvailability);
return (distributedElement != null) ? MediaPackageElementParser.getAsXml(distributedElement) : null;
case Retract:
MediaPackageElement retractedElement = retract(job, mediapackage, elementId);
return (retractedElement != null) ? MediaPackageElementParser.getAsXml(retractedElement) : null;
default:
throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
}
} catch (IllegalArgumentException e) {
throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
} catch (IndexOutOfBoundsException e) {
throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
} catch (Exception e) {
throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
}
}
/**
* Gets the destination file to copy the contents of a mediapackage element.
*
* @param mediaPackage
* the media package
* @param element
* The mediapackage element being distributed
* @return The file to copy the content to
*/
protected File getDistributionFile(MediaPackage mediaPackage, MediaPackageElement element) {
String elementId = element.getIdentifier();
String fileName = FilenameUtils.getName(element.getURI().toString()) + ".m3u8";
String directoryName = distributionDirectory.getAbsolutePath();
String destinationFileName = PathSupport.concat(new String[] { directoryName,
mediaPackage.getIdentifier().compact(), elementId, fileName });
return new File(destinationFileName);
}
/**
* Gets the URI for the element to be distributed.
*
* @param mediaPackageId
* the mediapackage identifier
* @param element
* The mediapackage element being distributed
* @return The resulting URI after distribution
* @throws URISyntaxException
* if the concrete implementation tries to create a malformed uri
*/
protected URI getDistributionUri(String mediaPackageId, MediaPackageElement element) throws URISyntaxException {
String elementId = element.getIdentifier();
String fileName = FilenameUtils.getName(element.getURI().toString()) + ".m3u8";
String destinationURI = UrlSupport.concat(serviceUrl, mediaPackageId, elementId, fileName);
return new URI(destinationURI);
}
/**
* Gets the directory containing the distributed files for this mediapackage.
*
* @param mediaPackageId
* the mediapackage ID
* @return the filesystem directory
*/
protected File getMediaPackageDirectory(String mediaPackageId) {
return new File(distributionDirectory, mediaPackageId);
}
protected boolean accepts(MediaPackageElement element){
// HLS streaming servers only deal with tracks
if (!MediaPackageElement.Type.Track.equals(element.getElementType())) {
return false;
}
// The HLS ffmpeg profile only copies content so the input track must be h.264 or AAC
TrackImpl track = (TrackImpl )element;
if (track.getVideo().isEmpty() && track.getAudio().isEmpty()) {
return false;
} else if (!track.getVideo().isEmpty() && !Arrays.asList("H.264", "AVC").contains(track.getVideo().get(0).getFormat())) {
return false;
} else if (!track.getAudio().isEmpty() && !track.getAudio().get(0).getFormat().equals("AAC")) {
return false;
}
return true;
}
/**
* Callback for the OSGi environment to set the workspace reference.
*
* @param workspace
* the workspace
*/
protected void setWorkspace(Workspace workspace) {
this.workspace = workspace;
}
/**
* Callback for the OSGi environment to set the service registry reference.
*
* @param serviceRegistry
* the service registry
*/
protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
*/
@Override
protected ServiceRegistry getServiceRegistry() {
return serviceRegistry;
}
/**
* Callback for setting the security service.
*
* @param securityService
* the securityService to set
*/
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
/**
* Callback for setting the trusted HTTP client.
*
* @param trustedHttpClient
* the trusted HTTP client to set
*/
public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
this.trustedHttpClient = trustedHttpClient;
}
/**
* Callback for setting the user directory service.
*
* @param userDirectoryService
* the userDirectoryService to set
*/
public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
this.userDirectoryService = userDirectoryService;
}
/**
* Sets a reference to the organization directory service.
*
* @param organizationDirectory
* the organization directory
*/
public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
this.organizationDirectoryService = organizationDirectory;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
*/
@Override
protected SecurityService getSecurityService() {
return securityService;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
*/
@Override
protected UserDirectoryService getUserDirectoryService() {
return userDirectoryService;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
*/
@Override
protected OrganizationDirectoryService getOrganizationDirectoryService() {
return organizationDirectoryService;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment