Skip to content

Instantly share code, notes, and snippets.

@monir-zaman
Created December 31, 2019 08:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save monir-zaman/886158308998c03fd107ee4b1782478d to your computer and use it in GitHub Desktop.
Save monir-zaman/886158308998c03fd107ee4b1782478d to your computer and use it in GitHub Desktop.
import javax.imageio.IIOImage;
import javax.imageio.ImageIO;
import javax.imageio.ImageWriteParam;
import javax.imageio.ImageWriter;
import javax.imageio.stream.ImageOutputStream;
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.awt.image.BufferedImage;
import java.io.*;
import java.net.*;
import java.util.Arrays;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.UUID;
public class Server extends JFrame implements ActionListener {
private static final long serialVersionUID = 1L;
//----------------
DatagramSocket RTPsocket; //socket to be used to send and receive UDP packets
DatagramPacket senddp; //UDP packet containing the video frames
InetAddress ClientIPAddr;
static int RTP_dest_port = 0;
static int RTSP_dest_port = 0;
//GUI:
//----------------
JLabel label;
//Video variables:
//----------------
int imagenb = 0; //image nb of the image currently transmitted
VideoStream video; //VideoStream object used to access video frames
static int MJPEG_TYPE = 26; //RTP payload type for MJPEG video
static int FRAME_PERIOD = 41; //Frame period of the video to stream, in ms
static int VIDEO_LENGTH = 500; //length of the video in frames
Timer timer; //timer used to send the images at the video frame rate
byte[] buf; //buffer used to store the images to send to the client
int sendDelay; //the delay to send images over the wire. Ideally should be
//equal to the frame rate of the video file, but may be
//adjusted when congestion is detected.
//RTSP variables
//----------------
//rtsp states
final static int INIT = 0;
final static int READY = 1;
final static int PLAYING = 2;
//rtsp message types
final static int SETUP = 3;
final static int PLAY = 4;
final static int PAUSE = 5;
final static int TEARDOWN = 6;
final static int OPTIONS = 7;
final static int DESCRIBE = 8;
static int state; //RTSP Server state == INIT or READY or PLAY
Socket RTSPsocket; //socket used to send/receive RTSP messages
//input and output stream filters
static BufferedReader RTSPBufferedReader;
static BufferedWriter RTSPBufferedWriter;
static String VideoFileName = "movie.Mjpeg"; //video file requested from the client
static String RTSPid = UUID.randomUUID().toString();
static int RTSPSeqNb = 1; //Sequence number of RTSP messages within the session
//RTCP variables
//----------------
static int RTCP_RCV_PORT = 19001; //not that important to client implementation
static int RTCP_PERIOD = 400; //How often to check for control events
DatagramSocket RTCPsocket;
RtcpReceiver rtcpReceiver;
int congestionLevel;
//Performance optimization and Congestion control
ImageTranslator imgTranslator;
CongestionController cc;
final static String CRLF = "\r\n";
//--------------------------------
//Constructor
//--------------------------------
public Server(String[] argv) {
//init Frame
super("RTSP Streaming: hit on local host and watch the videos");
//init RTP sending Timer
sendDelay = FRAME_PERIOD;
timer = new Timer(sendDelay, this);
timer.setInitialDelay(0);
timer.setCoalesce(true);
//init congestion controller
cc = new CongestionController(600);
//allocate memory for the sending buffer
buf = new byte[20000];
//Handler to close the main window
addWindowListener(new WindowAdapter() {
public void windowClosing(WindowEvent e) {
//stop the timer and exit
timer.stop();
rtcpReceiver.stopRcv();
System.exit(0);
}
});
//init the RTCP packet receiver
rtcpReceiver = new RtcpReceiver(RTCP_PERIOD);
//GUI:
label = new JLabel("RTSP Server is running", JLabel.CENTER);
getContentPane().add(label, BorderLayout.CENTER);
//Video encoding and quality
imgTranslator = new ImageTranslator(0.8f);
}
//------------------------------------
//main
//------------------------------------
public static void main(String argv[]) throws Exception {
//create a Server object
Server server = new Server(argv);
//show GUI:
server.pack();
server.setVisible(true);
server.setSize(new Dimension(400, 200));
//get RTSP socket port from the command line
int RTSPport = 8080;
//server.RTSP_dest_port = RTSPport;
//Initiate TCP connection with the client for the RTSP session
ServerSocket listenSocket = new ServerSocket(RTSPport);
server.RTSPsocket = listenSocket.accept();
System.out.println("Client accepted");
listenSocket.close();
//System.out.println("tcp connection status: "+server.RTPsocket.isConnected());
//Get Client IP address
server.ClientIPAddr = server.RTSPsocket.getInetAddress();
System.out.println("The ip address of the server is : "+server.ClientIPAddr+" ");
System.out.println("The ip address of the client is : "+server.RTSPsocket.getLocalAddress()+" ");
//Initiate RTSPstate
state = INIT;
RTSPBufferedReader = new BufferedReader(new InputStreamReader(server.RTSPsocket.getInputStream()));
RTSPBufferedWriter = new BufferedWriter(new OutputStreamWriter(server.RTSPsocket.getOutputStream()));
//Wait for the SETUP message from the client
int request_type;
boolean done = false;
while (!done) {
request_type = server.parseRequest(); //blocking
if (request_type == SETUP) {
done = true;
//update RTSP state
state = READY;
System.out.println("Got the setup request, changing state to ready");
//Send response
try {
System.out.println("Sending response to the server for the SETUP request");
//RTSPBufferedWriter.write("RTSP/1.0 200 OK" + CRLF);
RTSPBufferedWriter.write("hello this is a testing to check if it's being written correctly /1.0 404 Not Found" + CRLF);
System.out.println("RTSP/1.0 200 OK" + CRLF);
//RTSPBufferedWriter.write("CSeq: " + RTSPSeqNb + CRLF);
RTSPBufferedWriter.write("CSeq: " + RTSPSeqNb + CRLF);
System.out.println("CSeq: "+RTSPSeqNb + CRLF);
RTSPBufferedWriter.write("Transport: RTP/AVP;unicast;client_port="
+ RTP_dest_port + "-" + (RTP_dest_port + 1)
+ ";server_port=55556-55557" + CRLF);
RTSPBufferedWriter.write("Session: " + RTSPid + CRLF);
System.out.println("Session: "+ RTSPid + CRLF);
RTSPBufferedWriter.flush();//
System.out.println("RTSP Server - Sent response to Client using the send response method.");
} catch (Exception ex) {
System.out.println("Exception caught while sending response to SETUP request: " + ex);
System.exit(0);
}
//init the VideoStream object:
server.video = new VideoStream(VideoFileName);
//init RTP and RTCP sockets
server.RTPsocket = new DatagramSocket(55556);
server.RTCPsocket = new DatagramSocket(RTCP_RCV_PORT);
}
else if (request_type == OPTIONS) {
//update RTSP state
state = INIT;
System.out.println("New RTSP state: READY");
try {
RTSPBufferedWriter.write("RTSP/1.0 200 OK" + CRLF);
RTSPBufferedWriter.write("CSeq: " + RTSPSeqNb + CRLF);
RTSPBufferedWriter.write("Public: SETUP, PLAY, PAUSE, TEARDOWN" + CRLF);
//RTSPBufferedWriter.write("Supported: play.basic, setup.rtp.rtcp.mux, play.scale" + CRLF);
//RTSPBufferedWriter.write("Server: 127.0.0.1:8080"+ CRLF);
RTSPBufferedWriter.flush();
System.out.println("RTSP Server - Sent response to Client.");
} catch (Exception ex) {
System.out.println("Exception caught while sending response to OPTIONS: " + ex);
System.exit(0);
}
}
else if (request_type == DESCRIBE) {
try {
RTSPBufferedWriter.write("RTSP/1.0 200 OK" + CRLF);
RTSPBufferedWriter.write("CSeq: " + RTSPSeqNb + CRLF);
RTSPBufferedWriter.write("Content-Base: rtsp://localhost:8080" + CRLF);
//RTSPBufferedWriter.write("Supported: play.basic, setup.rtp.rtcp.mux, play.scale" + CRLF);
//RTSPBufferedWriter.write("Server: 127.0.0.1:8080"+ CRLF);
RTSPBufferedWriter.flush();
System.out.println("RTSP Server - Sent response to Client.");
} catch (Exception ex) {
System.out.println("Exception caught while sending response to OPTIONS: " + ex);
System.exit(0);
}
}
}
//loop to handle RTSP requests
while (true) {
//parse the request
request_type = server.parseRequest(); //blocking
if ((request_type == PLAY) && (state == READY)) {
//send back response
server.sendResponse();
//start timer
server.timer.start();
server.rtcpReceiver.startRcv();
//update state
state = PLAYING;
System.out.println("New RTSP state: PLAYING");
} else if ((request_type == PAUSE) && (state == PLAYING)) {
//send back response
server.sendResponse();
//stop timer
server.timer.stop();
server.rtcpReceiver.stopRcv();
//update state
state = READY;
System.out.println("New RTSP state: READY");
} else if (request_type == TEARDOWN) {
//send back response
server.sendResponse();
//stop timer
server.timer.stop();
server.rtcpReceiver.stopRcv();
//close sockets
server.RTSPsocket.close();
server.RTPsocket.close();
listenSocket.close();
System.exit(0);
}
}
}
//------------------------
//Handler for timer
//------------------------
public void actionPerformed(ActionEvent e) {
byte[] frame;
//if the current image nb is less than the length of the video
if (imagenb < VIDEO_LENGTH) {
//update current imagenb
imagenb++;
try {
//get next frame to send from the video, as well as its size
int image_length = video.getnextframe(buf);
//adjust quality of the image if there is congestion detected
if (congestionLevel > 0) {
imgTranslator.setCompressionQuality(1.0f - congestionLevel * 0.2f);
frame = imgTranslator.compress(Arrays.copyOfRange(buf, 0, image_length));
image_length = frame.length;
System.arraycopy(frame, 0, buf, 0, image_length);
}
//Builds an RTPpacket object containing the frame
RTPpacket rtp_packet = new RTPpacket(MJPEG_TYPE, imagenb, imagenb * FRAME_PERIOD, buf, image_length);
//get to total length of the full rtp packet to send
int packet_length = rtp_packet.getlength();
//retrieve the packet bitstream and store it in an array of bytes
byte[] packet_bits = new byte[packet_length];
rtp_packet.getpacket(packet_bits);
//send the packet as a DatagramPacket over the UDP socket
senddp = new DatagramPacket(packet_bits, packet_length, ClientIPAddr, RTP_dest_port);
RTPsocket.send(senddp);
System.out.println("Send frame #" + imagenb + ", Frame size: " + image_length + " (" + buf.length + ")");
//print the header bitstream
rtp_packet.printheader();
//update GUI
label.setText("Send frame #" + imagenb);
} catch (Exception ex) {
System.out.println("Exception caught: " + ex);
System.exit(0);
}
} else {
//if we have reached the end of the video file, stop the timer
timer.stop();
rtcpReceiver.stopRcv();
}
}
//Controls RTP sending rate based on traffic
//------------------------
class CongestionController implements ActionListener {
private Timer ccTimer;
int interval; //interval to check traffic stats
int prevLevel; //previously sampled congestion level
public CongestionController(int interval) {
this.interval = interval;
ccTimer = new Timer(interval, this);
ccTimer.start();
}
public void actionPerformed(ActionEvent e) {
//adjust the send rate
if (prevLevel != congestionLevel) {
sendDelay = FRAME_PERIOD + congestionLevel * (int) (FRAME_PERIOD * 0.1);
timer.setDelay(sendDelay);
prevLevel = congestionLevel;
System.out.println("Send delay changed to: " + sendDelay);
}
}
}
//------------------------
//Listener for RTCP packets sent from client
//------------------------
class RtcpReceiver implements ActionListener {
private Timer rtcpTimer;
private byte[] rtcpBuf;
int interval;
public RtcpReceiver(int interval) {
//set timer with interval for receiving packets
this.interval = interval;
rtcpTimer = new Timer(interval, this);
rtcpTimer.setInitialDelay(0);
rtcpTimer.setCoalesce(true);
//allocate buffer for receiving RTCP packets
rtcpBuf = new byte[512];
}
public void actionPerformed(ActionEvent e) {
//Construct a DatagramPacket to receive data from the UDP socket
DatagramPacket dp = new DatagramPacket(rtcpBuf, rtcpBuf.length);
float fractionLost;
try {
RTCPsocket.receive(dp); // Blocking
RTCPpacket rtcpPkt = new RTCPpacket(dp.getData(), dp.getLength());
System.out.println("[RTCP] " + rtcpPkt);
//set congestion level between 0 to 4
fractionLost = rtcpPkt.fractionLost;
if (fractionLost >= 0 && fractionLost <= 0.01) {
congestionLevel = 0; //less than 0.01 assume negligible
} else if (fractionLost > 0.01 && fractionLost <= 0.25) {
congestionLevel = 1;
} else if (fractionLost > 0.25 && fractionLost <= 0.5) {
congestionLevel = 2;
} else if (fractionLost > 0.5 && fractionLost <= 0.75) {
congestionLevel = 3;
} else {
congestionLevel = 4;
}
} catch (InterruptedIOException iioe) {
System.out.println("Nothing to read");
} catch (IOException ioe) {
System.out.println("Exception caught: " + ioe);
}
}
public void startRcv() {
rtcpTimer.start();
}
public void stopRcv() {
rtcpTimer.stop();
}
}
//------------------------------------
//Translate an image to different encoding or quality
//------------------------------------
class ImageTranslator {
private float compressionQuality;
private ByteArrayOutputStream baos;
private BufferedImage image;
private Iterator<ImageWriter> writers;
private ImageWriter writer;
private ImageWriteParam param;
private ImageOutputStream ios;
public ImageTranslator(float cq) {
compressionQuality = cq;
try {
baos = new ByteArrayOutputStream();
ios = ImageIO.createImageOutputStream(baos);
writers = ImageIO.getImageWritersByFormatName("jpeg");
writer = (ImageWriter) writers.next();
writer.setOutput(ios);
param = writer.getDefaultWriteParam();
param.setCompressionMode(ImageWriteParam.MODE_EXPLICIT);
param.setCompressionQuality(compressionQuality);
} catch (Exception ex) {
System.out.println("Exception caught: " + ex);
System.exit(0);
}
}
public byte[] compress(byte[] imageBytes) {
try {
baos.reset();
image = ImageIO.read(new ByteArrayInputStream(imageBytes));
writer.write(null, new IIOImage(image, null, null), param);
} catch (Exception ex) {
System.out.println("Exception caught: " + ex);
System.exit(0);
}
return baos.toByteArray();
}
public void setCompressionQuality(float cq) {
compressionQuality = cq;
param.setCompressionQuality(compressionQuality);
}
}
//------------------------------------
//Parse RTSP Request
//------------------------------------
String[] fullInput = null;
private int parseRequest() {
int request_type = -1;
try { boolean bool = true;
while (bool){
//parse request line and extract the request_type:
String RequestLine = RTSPBufferedReader.readLine();
if (RequestLine.isEmpty()) continue;
else bool = false;
System.out.println("The line read from rtsp buffered reader "+RequestLine);
StringTokenizer tokens = new StringTokenizer(RequestLine);
String request_type_string = tokens.nextToken();
//convert to request_type structure:
if ((new String(request_type_string)).compareTo("SETUP") == 0)
request_type = SETUP;
else if ((new String(request_type_string)).compareTo("PLAY") == 0)
request_type = PLAY;
else if ((new String(request_type_string)).compareTo("PAUSE") == 0)
request_type = PAUSE;
else if ((new String(request_type_string)).compareTo("TEARDOWN") == 0)
request_type = TEARDOWN;
else if ((new String(request_type_string)).compareTo("OPTIONS") == 0)
request_type = OPTIONS;
else if ((new String(request_type_string)).compareTo("DESCRIBE") == 0)
request_type = DESCRIBE;
String LastLine = null;
if (request_type == SETUP){
//parse the SeqNumLine and extract CSeq field
String SeqNumLine = RTSPBufferedReader.readLine();
System.out.println("SeqNumLine: "+SeqNumLine);
tokens = new StringTokenizer(SeqNumLine);
tokens.nextToken();
RTSPSeqNb = Integer.parseInt(tokens.nextToken());
//get LastLine
LastLine = RTSPBufferedReader.readLine();
System.out.println("LastLine: "+LastLine);
tokens = new StringTokenizer(LastLine);
}
if (request_type == SETUP) {
String[] forPort = LastLine.split(";");
forPort = forPort[2].split("=");
forPort = forPort[1].split("-");
RTP_dest_port = Integer.parseInt(forPort[0]);
System.out.println(RTP_dest_port);
}
else if (request_type == OPTIONS || request_type == DESCRIBE){continue;}
/*else {
tokens.nextToken(); //skip Session:
RTSPid = tokens.nextToken();
}*/
}
} catch (Exception ex) {
System.out.println("Exception caught in parseRequest: " + ex);
System.exit(0);
}
return (request_type);
}
private int parseTheBloodyRequest(){
System.out.println();
return -1;
}
private void sendResponse() {
try {
RTSPBufferedWriter.write("RTSP/1.0 200 OK" + CRLF);
RTSPBufferedWriter.write("CSeq: " + RTSPSeqNb + CRLF);
RTSPBufferedWriter.write("Session: " + RTSPid + CRLF);
RTSPBufferedWriter.flush();
System.out.println("RTSP Server - Sent response to Client.");
} catch (Exception ex) {
System.out.println("Exception caught in sendResponse: " + ex);
System.exit(0);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment