Skip to content

Instantly share code, notes, and snippets.

@istolga
Last active August 6, 2020 17:21
Show Gist options
  • Save istolga/a536be09e0bea010cf8ff837baf36fd7 to your computer and use it in GitHub Desktop.
Save istolga/a536be09e0bea010cf8ff837baf36fd7 to your computer and use it in GitHub Desktop.
create rtp stream dynamically for Janus Gateway
package com.crowdoptic.server.webrtc;
import com.crowdoptic.server.util.ConfigProperties;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.http.HttpEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.util.Collections;
import java.util.List;
/**
* Created by Olga on 9/12/16.
*/
public class WebRtcStreamManager {
private static final Logger LOGGER = LogManager.getLogger(WebRtcStreamManager.class);
private boolean isWebRtcEnabled = ConfigProperties.getInstance().getPropertyBoolean("vdash.webrtc.enabled", false);
private static RestTemplate restTemplate;
private static String janusServerUrl;
private static int janusHttpPort = 8088;
public static RestTemplate getRestTemplate() {
return restTemplate;
}
public static void setRestTemplate(RestTemplate restTemplate) {
WebRtcStreamManager.restTemplate = restTemplate;
}
public static String getJanusServerUrl() {
return janusServerUrl;
}
public static void setJanusServerUrl(String janusServerUrl) {
WebRtcStreamManager.janusServerUrl = janusServerUrl;
LOGGER.info("set janus server url to: " + janusServerUrl);
}
public WebRtcStreamManager() {
if (StringUtils.isBlank(janusServerUrl)) {
janusServerUrl = "http://127.0.0.1:" + janusHttpPort + "/janus";
setJanusServerUrl(janusServerUrl);
}
}
public long createStreamOnWebRtcServer(String streamId, int localAudioPort, int localVideoPort) {
if (!isWebRtcEnabled) {
LOGGER.info("WebRtc is not enabled, return -1 for its id and skip it");
return -1L;
}
long sessionId = getSessionId();
if (sessionId > 0) {
LOGGER.info("got session id: " + sessionId);
long handlerId = getPluginHandlerId(sessionId);
if (handlerId > 0) {
LOGGER.info("got handler id: " + handlerId);
List<JanusStreamingListItem> janusStreamingListItems = getStreamList(sessionId, handlerId);
LOGGER.info("got stream list, size: " + janusStreamingListItems.size());
long nextId = getMaxId(janusStreamingListItems) + 1L;
LOGGER.info("start creating stream with id: " + nextId);
boolean streamCreated = createStream(sessionId, handlerId, streamId, nextId, localAudioPort,
localVideoPort);
if (streamCreated) {
return nextId;
}
}
}
return -1L;
}
public boolean removeStreamOnWebRtcServer(long webRtcStreamId) {
if (!isWebRtcEnabled) {
LOGGER.info("WebRtc is not enabled, return -1 for its id and skip it");
return false;
}
long sessionId = getSessionId();
if (sessionId > 0) {
LOGGER.info("got session id: " + sessionId);
long handlerId = getPluginHandlerId(sessionId);
if (handlerId > 0) {
LOGGER.info("got handler id: " + handlerId);
return removeStream(sessionId, handlerId, webRtcStreamId);
}
}
return false;
}
private long getSessionId() {
LOGGER.info("in get session id");
JanusRequest createSessionRequest = new JanusRequest();
createSessionRequest.setJanusAction("create");
createSessionRequest.setTransactionUID(RandomStringUtils.randomAlphanumeric(12));
HttpEntity<JanusRequest> httpCreateRequest = new HttpEntity<>(createSessionRequest);
long sessionId = -1L;
try {
LOGGER.info("calling: " + janusServerUrl);
JanusGeneralResponse createResponse = restTemplate.postForObject(janusServerUrl, httpCreateRequest, JanusGeneralResponse.class);
if (createResponse != null && !createResponse.isError()) {
LOGGER.info("success in janus get session id call: " + createResponse);
if (createResponse.getData() != null) {
sessionId = createResponse.getData().getId();
}
} else {
LOGGER.error("Error in get session id call" + createResponse);
}
} catch (Throwable e) {
LOGGER.error("Throwable while calling create session for janus", e);
}
return sessionId;
}
private long getPluginHandlerId(long sessionId) {
JanusRequest createSessionRequest = new JanusRequest();
createSessionRequest.setJanusAction("attach");
createSessionRequest.setPlugin("janus.plugin.streaming");
createSessionRequest.setTransactionUID(RandomStringUtils.randomAlphanumeric(12));
HttpEntity<JanusRequest> httpCreateRequest = new HttpEntity<>(createSessionRequest);
long handlerId = -1L;
String url = janusServerUrl + "/" + sessionId;
try {
LOGGER.info("calling: " + url);
JanusGeneralResponse response = restTemplate.postForObject(url, httpCreateRequest, JanusGeneralResponse.class);
if (response != null && !response.isError()) {
LOGGER.info("success in janus attach plugin call: " + response);
if (response.getData() != null) {
handlerId = response.getData().getId();
}
} else {
LOGGER.error("Error in janus attach plugin call: " + response);
}
} catch (Throwable e) {
LOGGER.error("Throwable while calling create session for janus", e);
}
return handlerId;
}
private boolean createStream(long sessionId, long handlerId, String streamName, long streamId, int localAudioPort,
int localVideoPort) {
JanusRequest janusRequest = new JanusRequest();
janusRequest.setJanusAction("message");
janusRequest.setTransactionUID(RandomStringUtils.randomAlphanumeric(12));
JanusCreateStreamRequestBody janusCreateStreamRequestBody = new JanusCreateStreamRequestBody();
janusCreateStreamRequestBody.setId(streamId);
janusCreateStreamRequestBody.setDescription(streamName);
janusCreateStreamRequestBody.setName(streamName);
janusCreateStreamRequestBody.setVideoport(localVideoPort);
if (localAudioPort > 0) {
janusCreateStreamRequestBody.setAudio(true);
janusCreateStreamRequestBody.setAudioport(localAudioPort);
}
janusRequest.setBody(janusCreateStreamRequestBody);
HttpEntity<JanusRequest> httpRequest = new HttpEntity<>(janusRequest);
String url = janusServerUrl + "/" + sessionId + "/" + handlerId;
try {
LOGGER.info("calling: " + url);
JanusGeneralResponse response = restTemplate.postForObject(url, httpRequest, JanusGeneralResponse.class);
if (response != null && !response.isError()) {
LOGGER.info("success in janus create call: " + response);
return true;
} else {
LOGGER.error("Error in janus create call: " + response);
}
} catch (Throwable e) {
LOGGER.error("Throwable while calling create stream for janus", e);
}
return false;
}
private boolean removeStream(long sessionId, long handlerId, long streamId) {
LOGGER.info("removing stream using id: " + streamId);
JanusRequest janusRequest = new JanusRequest();
janusRequest.setJanusAction("message");
janusRequest.setTransactionUID(RandomStringUtils.randomAlphanumeric(12));
JanusRemoveStreamRequestBody janusStreamRequestBody = new JanusRemoveStreamRequestBody();
janusStreamRequestBody.setId(streamId);
janusRequest.setBody(janusStreamRequestBody);
HttpEntity<JanusRequest> httpRequest = new HttpEntity<>(janusRequest);
String url = janusServerUrl + "/" + sessionId + "/" + handlerId;
try {
LOGGER.info("calling: " + url);
JanusGeneralResponse response = restTemplate.postForObject(url, httpRequest, JanusGeneralResponse.class);
if (response != null && !response.isError()) {
LOGGER.info("success in janus remove call: " + response);
return true;
} else {
LOGGER.error("Error in janus remove call: " + response);
}
} catch (Throwable e) {
LOGGER.error("Throwable while calling remove stream for janus", e);
}
return false;
}
private long getMaxId(List<JanusStreamingListItem> items) {
long maxId = 0L;
for (JanusStreamingListItem i : items) {
if (i.getId() > maxId) {
maxId = i.getId();
}
}
return maxId;
}
private List<JanusStreamingListItem> getStreamList(long sessionId, long handlerId) {
List<JanusStreamingListItem> janusStreamingListItems = Collections.emptyList();
JanusRequest janusRequest = new JanusRequest();
janusRequest.setJanusAction("message");
janusRequest.setTransactionUID(RandomStringUtils.randomAlphanumeric(12));
janusRequest.setBody(new JanusRequestBody("list"));
HttpEntity<JanusRequest> httpRequest = new HttpEntity<>(janusRequest);
String url = janusServerUrl + "/" + sessionId + "/" + handlerId;
try {
LOGGER.info("calling: " + url);
JanusGeneralResponse response = restTemplate.postForObject(url, httpRequest, JanusGeneralResponse.class);
if (response != null && !response.isError()) {
LOGGER.info("success in janus stream list call: " + response);
if (response.getPlugindata() != null && response.getPlugindata().getData() != null) {
janusStreamingListItems = response.getPlugindata().getData().getList();
}
} else {
LOGGER.error("Error in janus stream list call" + response);
}
} catch (Throwable e) {
LOGGER.error("Throwable while calling stream list for janus", e);
}
return janusStreamingListItems;
}
public static void main(String[] args) {
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
RestTemplate restTemplate = new RestTemplate(factory);
WebRtcStreamManager.setJanusServerUrl("https://<server_url>:8089/janus");
WebRtcStreamManager.setRestTemplate(restTemplate);
WebRtcStreamManager webRtcStreamManager = new WebRtcStreamManager();
webRtcStreamManager.createStreamOnWebRtcServer("123435", 5012, 5013);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment