--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/RTCPSession.java Fri Nov 20 19:29:42 2009 +0100
@@ -0,0 +1,565 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+package jlibrtp;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+/**
+ * This class acts as an organizer for most of the information and functions
+ * pertaining to RTCP packet generation and reception
+ *
+ * @author Arne Kepp
+ *
+ */
+public class RTCPSession {
+ /** Parent session */
+ protected RTPSession rtpSession = null;
+
+ /** Unicast socket */
+ protected DatagramSocket rtcpSock = null;
+ /** Multicast socket */
+ protected MulticastSocket rtcpMCSock = null;
+ /** Multicast group */
+ protected InetAddress mcGroup = null;
+
+ /** RTCP Receiver thread */
+ protected RTCPReceiverThread recvThrd = null;
+ /** RTCP Sender thread */
+ protected RTCPSenderThread senderThrd = null;
+
+ /** Previous time a delay was calculated */
+ protected long prevTime = System.currentTimeMillis();
+ /** Delay between RTCP transmissions, in ms. Initialized in start() */
+ protected int nextDelay = -1; //
+ /**
+ * The average compound RTCP packet size, in octets, including UDP and IP
+ * headers
+ */
+ protected int avgPktSize = 200; //
+ /** Pessimistic case estimate of the current number of senders */
+ protected int senderCount = 1;
+ /** Whether next RTCP packet can be sent early */
+ protected boolean fbAllowEarly = false;
+ /** Feedback queue , index is SSRC of target */
+ protected Hashtable<Long, LinkedList<RtcpPkt>> fbQueue = null;
+ /** APP queue , index is SSRC of target */
+ protected Hashtable<Long, LinkedList<RtcpPktAPP>> appQueue = null;
+ /** Are we just starting up? */
+ protected boolean initial = true;
+ /** Is there a feedback packet waiting? SSRC of destination */
+ protected long fbWaiting = -1;
+
+ /**
+ * Constructor for unicast sessions
+ *
+ * @param parent
+ * RTPSession that started this
+ * @param rtcpSocket
+ * the socket to use for listening and sending
+ */
+ protected RTCPSession(RTPSession parent, DatagramSocket rtcpSocket) {
+ this.rtcpSock = rtcpSocket;
+ rtpSession = parent;
+ }
+
+ /**
+ * Constructor for multicast sessions
+ *
+ * @param parent
+ * parent RTPSession
+ * @param rtcpSocket
+ * parent RTPSession that started this
+ * @param multicastGroup
+ * multicast group to bind the socket to
+ */
+ protected RTCPSession(RTPSession parent, MulticastSocket rtcpSocket,
+ InetAddress multicastGroup) {
+ mcGroup = multicastGroup;
+ this.rtcpSock = rtcpSocket;
+ rtpSession = parent;
+ }
+
+ /**
+ * Starts the session, calculates delays and fires up the threads.
+ *
+ */
+ protected void start() {
+ // nextDelay = 2500 + rtpSession.random.nextInt(1000) - 500;
+ this.calculateDelay();
+ recvThrd = new RTCPReceiverThread(this, this.rtpSession);
+ senderThrd = new RTCPSenderThread(this, this.rtpSession);
+ recvThrd.start();
+ senderThrd.start();
+ }
+
+ /**
+ * Send bye packets, handled by RTCP Sender thread
+ *
+ */
+ protected void sendByes() {
+ senderThrd.sendByes();
+ }
+
+ /**
+ * Calculate the delay before the next RTCP packet can be sent
+ *
+ */
+ protected void calculateDelay() {
+ switch (rtpSession.rtcpMode) {
+ case 0:
+ calculateRegularDelay();
+ break;
+ default:
+ System.out.println("RTCPSession.calculateDelay() unknown .mode");
+ }
+ }
+
+ /**
+ * Calculates a delay value in accordance with RFC 3550
+ *
+ */
+ protected void calculateRegularDelay() {
+ long curTime = System.currentTimeMillis();
+
+ if (rtpSession.bandwidth != 0 && !this.initial
+ && rtpSession.partDb.ssrcTable.size() > 4) {
+ // RTPs mechanisms for RTCP scalability
+ int rand = rtpSession.random.nextInt(10000) - 5000; // between -500
+ // and +500
+ double randDouble = ((double) 1000 + rand) / 1000.0;
+
+ Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
+ while (enu.hasMoreElements()) {
+ Participant part = enu.nextElement();
+ if (part.lastRtpPkt > this.prevTime)
+ senderCount++;
+ }
+
+ double bw;
+ if (rtpSession.rtcpBandwidth > -1) {
+ bw = rtpSession.rtcpBandwidth;
+ } else {
+ bw = rtpSession.bandwidth * 0.05;
+ }
+ if (senderCount * 2 > rtpSession.partDb.ssrcTable.size()) {
+ if (rtpSession.lastTimestamp > this.prevTime) {
+ // We're a sender
+ double numerator = ((double) this.avgPktSize)
+ * ((double) senderCount);
+ double denominator = 0.25 * bw;
+ this.nextDelay = (int) Math.round((numerator / denominator)
+ * randDouble);
+ } else {
+ // We're a receiver
+ double numerator = ((double) this.avgPktSize)
+ * ((double) rtpSession.partDb.ssrcTable.size());
+ double denominator = 0.75 * bw;
+ this.nextDelay = (int) Math.round((numerator / denominator)
+ * randDouble);
+ }
+ } else {
+ double numerator = ((double) this.avgPktSize)
+ * ((double) rtpSession.partDb.ssrcTable.size());
+ ;
+ double denominator = bw;
+ this.nextDelay = (int) Math
+ .round(1000.0 * (numerator / denominator))
+ * (1000 + rand);
+ }
+ } else {
+ // Not enough data to scale, use random values
+ int rand = rtpSession.random.nextInt(1000) - 500; // between -500
+ // and +500
+ if (this.initial) {
+ // 2.5 to 3.5 seconds, randomly
+ this.nextDelay = 3000 + rand;
+ this.initial = false;
+ } else {
+ // 4.5 to 5.5 seconds, randomly
+ this.nextDelay = 5500 + rand;
+ }
+
+ }
+
+ // preflight check
+ if (this.nextDelay < 1000) {
+ int rand = rtpSession.random.nextInt(1000) - 500; // between -500
+ // and +500
+ System.out
+ .println("RTCPSession.calculateDelay() nextDelay was too short ("
+ + this.nextDelay
+ + "ms), setting to "
+ + (this.nextDelay = 2000 + rand));
+ }
+ this.prevTime = curTime;
+ }
+
+ /**
+ * Update the average packet size
+ *
+ * @param length
+ * of latest packet
+ */
+ synchronized protected void updateAvgPacket(int length) {
+ double tempAvg = (double) this.avgPktSize;
+ tempAvg = (15 * tempAvg + ((double) length)) / 16;
+ this.avgPktSize = (int) tempAvg;
+ }
+
+ /**
+ * Adds an RTCP APP (application) packet to the queue
+ *
+ * @param targetSsrc
+ * the SSRC of the recipient
+ * @param aPkt
+ */
+ synchronized protected void addToAppQueue(long targetSsrc, RtcpPktAPP aPkt) {
+ aPkt.time = System.currentTimeMillis();
+
+ if (this.appQueue == null)
+ this.appQueue = new Hashtable<Long, LinkedList<RtcpPktAPP>>();
+
+ LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
+ if (ll == null) {
+ // No list, create and add
+ ll = new LinkedList<RtcpPktAPP>();
+ this.appQueue.put(targetSsrc, ll);
+ }
+
+ ll.add(aPkt);
+ }
+
+ /**
+ * Adds an RTCP APP (application) packet to the queue
+ *
+ * @param targetSsrc
+ * the SSRC of the recipient
+ * @return array of RTCP Application packets
+ */
+ synchronized protected RtcpPktAPP[] getFromAppQueue(long targetSsrc) {
+ if (this.appQueue == null)
+ return null;
+
+ LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
+ if (ll == null || ll.isEmpty()) {
+ return null;
+ } else {
+ RtcpPktAPP[] ret = new RtcpPktAPP[ll.size()];
+ ListIterator<RtcpPktAPP> li = ll.listIterator();
+ int i = 0;
+ while (li.hasNext()) {
+ ret[i] = li.next();
+ i++;
+ }
+ return ret;
+ }
+ }
+
+ /**
+ * Cleans the TCP APP (application) packet queues of any packets that are
+ * too old, defined as 60 seconds since insertion.
+ *
+ * @param ssrc
+ * The SSRC of the user who has left, negative value -> general
+ * cleanup
+ */
+ synchronized protected void cleanAppQueue(long ssrc) {
+ if (this.appQueue == null)
+ return;
+
+ if (ssrc > 0) {
+ this.appQueue.remove(ssrc);
+ } else {
+ Enumeration<LinkedList<RtcpPktAPP>> enu = this.appQueue.elements();
+ long curTime = System.currentTimeMillis();
+
+ while (enu.hasMoreElements()) {
+ ListIterator<RtcpPktAPP> li = enu.nextElement().listIterator();
+ while (li.hasNext()) {
+ RtcpPkt aPkt = li.next();
+ // Remove after 60 seconds
+ if (curTime - aPkt.time > 60000) {
+ li.remove();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Check the feedback queue for similar packets and adds the new packet if
+ * it is not redundant
+ *
+ * @param aPkt
+ * @return 0 if the packet was added, 1 if it was dropped
+ */
+ synchronized protected int addToFbQueue(long targetSsrc, RtcpPkt aPkt) {
+ if (this.fbQueue == null)
+ this.fbQueue = new Hashtable<Long, LinkedList<RtcpPkt>>();
+
+ LinkedList<RtcpPkt> ll = this.fbQueue.get(targetSsrc);
+ if (ll == null) {
+ // No list, create and add
+ ll = new LinkedList<RtcpPkt>();
+ ll.add(aPkt);
+ this.fbQueue.put(targetSsrc, ll);
+ } else {
+ // Check for matching packets, else add to end
+ ListIterator<RtcpPkt> li = ll.listIterator();
+ while (li.hasNext()) {
+ RtcpPkt tmp = li.next();
+ if (equivalent(tmp, aPkt))
+ return -1;
+ }
+ ll.addLast(aPkt);
+ }
+ return 0;
+ }
+
+ /**
+ * Checks whether there are ny feedback packets waiting to be sent.
+ *
+ * @param ssrc
+ * of the participant we are notifying
+ * @return all relevant feedback packets, or null
+ */
+ synchronized protected RtcpPkt[] getFromFbQueue(long ssrc) {
+ if (this.fbQueue == null)
+ return null;
+
+ LinkedList<RtcpPkt> ll = this.fbQueue.get(ssrc);
+
+ if (ll == null)
+ return null;
+
+ ListIterator<RtcpPkt> li = ll.listIterator();
+ if (li.hasNext()) {
+ long curTime = System.currentTimeMillis();
+ long maxDelay = curTime - rtpSession.fbMaxDelay;
+ long keepDelay = curTime - 2000;
+ int count = 0;
+
+ // TODO below the indeces should be collected instead of looping
+ // twice
+
+ // Clean out what we dont want and count what we want
+ while (li.hasNext()) {
+ RtcpPkt aPkt = li.next();
+ if (aPkt.received) {
+ // This is a packet received, we keep these for
+ // 2000ms to avoid redundant feedback
+ if (aPkt.time < keepDelay)
+ li.remove();
+ } else {
+ // This is a packet we havent sent yet
+ if (aPkt.time < maxDelay) {
+ li.remove();
+ } else {
+ count++;
+ }
+ }
+ }
+
+ // Gather what we want to return
+ if (count != 0) {
+ li = ll.listIterator();
+ RtcpPkt[] ret = new RtcpPkt[count];
+
+ while (count > 0) {
+ RtcpPkt aPkt = li.next();
+ if (!aPkt.received) {
+ ret[ret.length - count] = aPkt;
+ count--;
+ }
+ }
+ return ret;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Cleans the feeback queue of any packets that have expired, ie feedback
+ * packet that are no longer relevant.
+ *
+ * @param ssrc
+ * The SSRC of the user who has left, negative value -> general
+ * cleanup
+ */
+ synchronized protected void cleanFbQueue(long ssrc) {
+ if (this.fbQueue == null)
+ return;
+
+ if (ssrc > 0) {
+ this.fbQueue.remove(ssrc);
+ } else {
+ Enumeration<LinkedList<RtcpPkt>> enu = this.fbQueue.elements();
+ long curTime = System.currentTimeMillis();
+ long maxDelay = curTime - rtpSession.fbMaxDelay;
+ long keepDelay = curTime - 2000;
+
+ while (enu.hasMoreElements()) {
+ ListIterator<RtcpPkt> li = enu.nextElement().listIterator();
+ while (li.hasNext()) {
+ RtcpPkt aPkt = li.next();
+ if (aPkt.received) {
+ // This is a packet received, we keep these for
+ // 2000ms to avoid redundant feedback
+ if (aPkt.time < keepDelay)
+ li.remove();
+ } else {
+ // This is a packet we havent sent yet
+ if (aPkt.time < maxDelay)
+ li.remove();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Check whether the conditions are satisfied to send a feedbkac packet
+ * immediately.
+ *
+ * @return true if they are, false otherwise
+ */
+ protected boolean fbSendImmediately() {
+ if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbEarlyThreshold
+ && rtpSession.partDb.receivers.size() > this.rtpSession.fbEarlyThreshold)
+ return false;
+
+ return true;
+ }
+
+ /**
+ * Check whether the conditions are satisfied to send a feedbkac packet
+ * immediately.
+ *
+ * @return true if they are, false otherwise
+ */
+ protected boolean fbSendEarly() {
+ if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbRegularThreshold
+ && rtpSession.partDb.receivers.size() > this.rtpSession.fbRegularThreshold)
+ return false;
+
+ return true;
+ }
+
+ /**
+ * Wake the sender thread because of this ssrc
+ *
+ * @param ssrc
+ * that has feedback waiting.
+ */
+ protected void wakeSenderThread(long ssrc) {
+ this.fbWaiting = ssrc;
+ this.senderThrd.interrupt();
+
+ // Give it a chance to catch up
+ try {
+ Thread.sleep(0, 1);
+ } catch (Exception e) {
+ }
+ ;
+ }
+
+ /**
+ * Compares two packets to check whether they are equivalent feedback
+ * messages, to avoid sending the same feedback to a host twice.
+ *
+ * Expect false negatives, but not false positives.
+ *
+ * @param one
+ * packet
+ * @param two
+ * packet
+ * @return true if they are equivalent, false otherwise
+ */
+ private boolean equivalent(RtcpPkt one, RtcpPkt two) {
+ // Cheap checks
+ if (one.packetType != two.packetType)
+ return false;
+
+ if (one.itemCount != two.itemCount)
+ return false;
+
+ if (one.packetType == 205) {
+ // RTP Feedback, i.e. a NACK
+ RtcpPktRTPFB pktone = (RtcpPktRTPFB) one;
+ RtcpPktRTPFB pkttwo = (RtcpPktRTPFB) two;
+
+ if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
+ return false;
+
+ if (Arrays.equals(pktone.BLP, pkttwo.BLP)
+ && Arrays.equals(pktone.BLP, pkttwo.BLP))
+ return true;
+
+ return true;
+ } else if (one.packetType == 206) {
+ RtcpPktPSFB pktone = (RtcpPktPSFB) one;
+ RtcpPktPSFB pkttwo = (RtcpPktPSFB) two;
+
+ if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
+ return false;
+
+ switch (one.itemCount) {
+ case 1: // Picture Loss Indication
+ return true;
+
+ case 2: // Slice Loss Indication
+ // This will not work if the slice loss indicators are in
+ // different order
+ if (pktone.sliFirst.length == pkttwo.sliFirst.length
+ && Arrays.equals(pktone.sliFirst, pkttwo.sliFirst)
+ && Arrays.equals(pktone.sliNumber, pkttwo.sliNumber)
+ && Arrays.equals(pktone.sliPictureId,
+ pkttwo.sliPictureId))
+ return true;
+ break;
+ case 3: // Reference Picture Selection Indication
+ if (Arrays.equals(pktone.rpsiBitString, pkttwo.rpsiBitString))
+ return true;
+ break;
+ case 15: // Application Layer Feedback Messages
+ // This will not work if the padding scheme is different
+ if (pktone.sliFirst.length == pkttwo.sliFirst.length
+ && Arrays.equals(pktone.alfBitString,
+ pkttwo.alfBitString))
+ return true;
+ break;
+ default:
+
+ }
+ return true;
+ } else {
+ System.out
+ .println("!!!! RTCPSession.equivalentPackets() encountered unexpected packet type!");
+ }
+ return false;
+ }
+}