diff -r f5a5d9237d69 -r e8d6255306f8 src/jlibrtp/RTCPSession.java --- a/src/jlibrtp/RTCPSession.java Sat Jan 23 21:48:58 2010 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,565 +0,0 @@ -/** - * Java RTP Library (jlibrtp) - * Copyright (C) 2006 Arne Kepp - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ -package jlibrtp; - -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.MulticastSocket; -import java.util.Arrays; -import java.util.Enumeration; -import java.util.Hashtable; -import java.util.LinkedList; -import java.util.ListIterator; - -/** - * This class acts as an organizer for most of the information and functions - * pertaining to RTCP packet generation and reception - * - * @author Arne Kepp - * - */ -public class RTCPSession { - /** Parent session */ - protected RTPSession rtpSession = null; - - /** Unicast socket */ - protected DatagramSocket rtcpSock = null; - /** Multicast socket */ - protected MulticastSocket rtcpMCSock = null; - /** Multicast group */ - protected InetAddress mcGroup = null; - - /** RTCP Receiver thread */ - protected RTCPReceiverThread recvThrd = null; - /** RTCP Sender thread */ - protected RTCPSenderThread senderThrd = null; - - /** Previous time a delay was calculated */ - protected long prevTime = System.currentTimeMillis(); - /** Delay between RTCP transmissions, in ms. Initialized in start() */ - protected int nextDelay = -1; // - /** - * The average compound RTCP packet size, in octets, including UDP and IP - * headers - */ - protected int avgPktSize = 200; // - /** Pessimistic case estimate of the current number of senders */ - protected int senderCount = 1; - /** Whether next RTCP packet can be sent early */ - protected boolean fbAllowEarly = false; - /** Feedback queue , index is SSRC of target */ - protected Hashtable> fbQueue = null; - /** APP queue , index is SSRC of target */ - protected Hashtable> appQueue = null; - /** Are we just starting up? */ - protected boolean initial = true; - /** Is there a feedback packet waiting? SSRC of destination */ - protected long fbWaiting = -1; - - /** - * Constructor for unicast sessions - * - * @param parent - * RTPSession that started this - * @param rtcpSocket - * the socket to use for listening and sending - */ - protected RTCPSession(RTPSession parent, DatagramSocket rtcpSocket) { - this.rtcpSock = rtcpSocket; - rtpSession = parent; - } - - /** - * Constructor for multicast sessions - * - * @param parent - * parent RTPSession - * @param rtcpSocket - * parent RTPSession that started this - * @param multicastGroup - * multicast group to bind the socket to - */ - protected RTCPSession(RTPSession parent, MulticastSocket rtcpSocket, - InetAddress multicastGroup) { - mcGroup = multicastGroup; - this.rtcpSock = rtcpSocket; - rtpSession = parent; - } - - /** - * Starts the session, calculates delays and fires up the threads. - * - */ - protected void start() { - // nextDelay = 2500 + rtpSession.random.nextInt(1000) - 500; - this.calculateDelay(); - recvThrd = new RTCPReceiverThread(this, this.rtpSession); - senderThrd = new RTCPSenderThread(this, this.rtpSession); - recvThrd.start(); - senderThrd.start(); - } - - /** - * Send bye packets, handled by RTCP Sender thread - * - */ - protected void sendByes() { - senderThrd.sendByes(); - } - - /** - * Calculate the delay before the next RTCP packet can be sent - * - */ - protected void calculateDelay() { - switch (rtpSession.rtcpMode) { - case 0: - calculateRegularDelay(); - break; - default: - System.out.println("RTCPSession.calculateDelay() unknown .mode"); - } - } - - /** - * Calculates a delay value in accordance with RFC 3550 - * - */ - protected void calculateRegularDelay() { - long curTime = System.currentTimeMillis(); - - if (rtpSession.bandwidth != 0 && !this.initial - && rtpSession.partDb.ssrcTable.size() > 4) { - // RTPs mechanisms for RTCP scalability - int rand = rtpSession.random.nextInt(10000) - 5000; // between -500 - // and +500 - double randDouble = ((double) 1000 + rand) / 1000.0; - - Enumeration enu = rtpSession.partDb.getParticipants(); - while (enu.hasMoreElements()) { - Participant part = enu.nextElement(); - if (part.lastRtpPkt > this.prevTime) - senderCount++; - } - - double bw; - if (rtpSession.rtcpBandwidth > -1) { - bw = rtpSession.rtcpBandwidth; - } else { - bw = rtpSession.bandwidth * 0.05; - } - if (senderCount * 2 > rtpSession.partDb.ssrcTable.size()) { - if (rtpSession.lastTimestamp > this.prevTime) { - // We're a sender - double numerator = ((double) this.avgPktSize) - * ((double) senderCount); - double denominator = 0.25 * bw; - this.nextDelay = (int) Math.round((numerator / denominator) - * randDouble); - } else { - // We're a receiver - double numerator = ((double) this.avgPktSize) - * ((double) rtpSession.partDb.ssrcTable.size()); - double denominator = 0.75 * bw; - this.nextDelay = (int) Math.round((numerator / denominator) - * randDouble); - } - } else { - double numerator = ((double) this.avgPktSize) - * ((double) rtpSession.partDb.ssrcTable.size()); - ; - double denominator = bw; - this.nextDelay = (int) Math - .round(1000.0 * (numerator / denominator)) - * (1000 + rand); - } - } else { - // Not enough data to scale, use random values - int rand = rtpSession.random.nextInt(1000) - 500; // between -500 - // and +500 - if (this.initial) { - // 2.5 to 3.5 seconds, randomly - this.nextDelay = 3000 + rand; - this.initial = false; - } else { - // 4.5 to 5.5 seconds, randomly - this.nextDelay = 5500 + rand; - } - - } - - // preflight check - if (this.nextDelay < 1000) { - int rand = rtpSession.random.nextInt(1000) - 500; // between -500 - // and +500 - System.out - .println("RTCPSession.calculateDelay() nextDelay was too short (" - + this.nextDelay - + "ms), setting to " - + (this.nextDelay = 2000 + rand)); - } - this.prevTime = curTime; - } - - /** - * Update the average packet size - * - * @param length - * of latest packet - */ - synchronized protected void updateAvgPacket(int length) { - double tempAvg = (double) this.avgPktSize; - tempAvg = (15 * tempAvg + ((double) length)) / 16; - this.avgPktSize = (int) tempAvg; - } - - /** - * Adds an RTCP APP (application) packet to the queue - * - * @param targetSsrc - * the SSRC of the recipient - * @param aPkt - */ - synchronized protected void addToAppQueue(long targetSsrc, RtcpPktAPP aPkt) { - aPkt.time = System.currentTimeMillis(); - - if (this.appQueue == null) - this.appQueue = new Hashtable>(); - - LinkedList ll = this.appQueue.get(targetSsrc); - if (ll == null) { - // No list, create and add - ll = new LinkedList(); - this.appQueue.put(targetSsrc, ll); - } - - ll.add(aPkt); - } - - /** - * Adds an RTCP APP (application) packet to the queue - * - * @param targetSsrc - * the SSRC of the recipient - * @return array of RTCP Application packets - */ - synchronized protected RtcpPktAPP[] getFromAppQueue(long targetSsrc) { - if (this.appQueue == null) - return null; - - LinkedList ll = this.appQueue.get(targetSsrc); - if (ll == null || ll.isEmpty()) { - return null; - } else { - RtcpPktAPP[] ret = new RtcpPktAPP[ll.size()]; - ListIterator li = ll.listIterator(); - int i = 0; - while (li.hasNext()) { - ret[i] = li.next(); - i++; - } - return ret; - } - } - - /** - * Cleans the TCP APP (application) packet queues of any packets that are - * too old, defined as 60 seconds since insertion. - * - * @param ssrc - * The SSRC of the user who has left, negative value -> general - * cleanup - */ - synchronized protected void cleanAppQueue(long ssrc) { - if (this.appQueue == null) - return; - - if (ssrc > 0) { - this.appQueue.remove(ssrc); - } else { - Enumeration> enu = this.appQueue.elements(); - long curTime = System.currentTimeMillis(); - - while (enu.hasMoreElements()) { - ListIterator li = enu.nextElement().listIterator(); - while (li.hasNext()) { - RtcpPkt aPkt = li.next(); - // Remove after 60 seconds - if (curTime - aPkt.time > 60000) { - li.remove(); - } - } - } - } - } - - /** - * Check the feedback queue for similar packets and adds the new packet if - * it is not redundant - * - * @param aPkt - * @return 0 if the packet was added, 1 if it was dropped - */ - synchronized protected int addToFbQueue(long targetSsrc, RtcpPkt aPkt) { - if (this.fbQueue == null) - this.fbQueue = new Hashtable>(); - - LinkedList ll = this.fbQueue.get(targetSsrc); - if (ll == null) { - // No list, create and add - ll = new LinkedList(); - ll.add(aPkt); - this.fbQueue.put(targetSsrc, ll); - } else { - // Check for matching packets, else add to end - ListIterator li = ll.listIterator(); - while (li.hasNext()) { - RtcpPkt tmp = li.next(); - if (equivalent(tmp, aPkt)) - return -1; - } - ll.addLast(aPkt); - } - return 0; - } - - /** - * Checks whether there are ny feedback packets waiting to be sent. - * - * @param ssrc - * of the participant we are notifying - * @return all relevant feedback packets, or null - */ - synchronized protected RtcpPkt[] getFromFbQueue(long ssrc) { - if (this.fbQueue == null) - return null; - - LinkedList ll = this.fbQueue.get(ssrc); - - if (ll == null) - return null; - - ListIterator li = ll.listIterator(); - if (li.hasNext()) { - long curTime = System.currentTimeMillis(); - long maxDelay = curTime - rtpSession.fbMaxDelay; - long keepDelay = curTime - 2000; - int count = 0; - - // TODO below the indeces should be collected instead of looping - // twice - - // Clean out what we dont want and count what we want - while (li.hasNext()) { - RtcpPkt aPkt = li.next(); - if (aPkt.received) { - // This is a packet received, we keep these for - // 2000ms to avoid redundant feedback - if (aPkt.time < keepDelay) - li.remove(); - } else { - // This is a packet we havent sent yet - if (aPkt.time < maxDelay) { - li.remove(); - } else { - count++; - } - } - } - - // Gather what we want to return - if (count != 0) { - li = ll.listIterator(); - RtcpPkt[] ret = new RtcpPkt[count]; - - while (count > 0) { - RtcpPkt aPkt = li.next(); - if (!aPkt.received) { - ret[ret.length - count] = aPkt; - count--; - } - } - return ret; - } - } - - return null; - } - - /** - * Cleans the feeback queue of any packets that have expired, ie feedback - * packet that are no longer relevant. - * - * @param ssrc - * The SSRC of the user who has left, negative value -> general - * cleanup - */ - synchronized protected void cleanFbQueue(long ssrc) { - if (this.fbQueue == null) - return; - - if (ssrc > 0) { - this.fbQueue.remove(ssrc); - } else { - Enumeration> enu = this.fbQueue.elements(); - long curTime = System.currentTimeMillis(); - long maxDelay = curTime - rtpSession.fbMaxDelay; - long keepDelay = curTime - 2000; - - while (enu.hasMoreElements()) { - ListIterator li = enu.nextElement().listIterator(); - while (li.hasNext()) { - RtcpPkt aPkt = li.next(); - if (aPkt.received) { - // This is a packet received, we keep these for - // 2000ms to avoid redundant feedback - if (aPkt.time < keepDelay) - li.remove(); - } else { - // This is a packet we havent sent yet - if (aPkt.time < maxDelay) - li.remove(); - } - } - } - } - } - - /** - * Check whether the conditions are satisfied to send a feedbkac packet - * immediately. - * - * @return true if they are, false otherwise - */ - protected boolean fbSendImmediately() { - if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbEarlyThreshold - && rtpSession.partDb.receivers.size() > this.rtpSession.fbEarlyThreshold) - return false; - - return true; - } - - /** - * Check whether the conditions are satisfied to send a feedbkac packet - * immediately. - * - * @return true if they are, false otherwise - */ - protected boolean fbSendEarly() { - if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbRegularThreshold - && rtpSession.partDb.receivers.size() > this.rtpSession.fbRegularThreshold) - return false; - - return true; - } - - /** - * Wake the sender thread because of this ssrc - * - * @param ssrc - * that has feedback waiting. - */ - protected void wakeSenderThread(long ssrc) { - this.fbWaiting = ssrc; - this.senderThrd.interrupt(); - - // Give it a chance to catch up - try { - Thread.sleep(0, 1); - } catch (Exception e) { - } - ; - } - - /** - * Compares two packets to check whether they are equivalent feedback - * messages, to avoid sending the same feedback to a host twice. - * - * Expect false negatives, but not false positives. - * - * @param one - * packet - * @param two - * packet - * @return true if they are equivalent, false otherwise - */ - private boolean equivalent(RtcpPkt one, RtcpPkt two) { - // Cheap checks - if (one.packetType != two.packetType) - return false; - - if (one.itemCount != two.itemCount) - return false; - - if (one.packetType == 205) { - // RTP Feedback, i.e. a NACK - RtcpPktRTPFB pktone = (RtcpPktRTPFB) one; - RtcpPktRTPFB pkttwo = (RtcpPktRTPFB) two; - - if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource) - return false; - - if (Arrays.equals(pktone.BLP, pkttwo.BLP) - && Arrays.equals(pktone.BLP, pkttwo.BLP)) - return true; - - return true; - } else if (one.packetType == 206) { - RtcpPktPSFB pktone = (RtcpPktPSFB) one; - RtcpPktPSFB pkttwo = (RtcpPktPSFB) two; - - if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource) - return false; - - switch (one.itemCount) { - case 1: // Picture Loss Indication - return true; - - case 2: // Slice Loss Indication - // This will not work if the slice loss indicators are in - // different order - if (pktone.sliFirst.length == pkttwo.sliFirst.length - && Arrays.equals(pktone.sliFirst, pkttwo.sliFirst) - && Arrays.equals(pktone.sliNumber, pkttwo.sliNumber) - && Arrays.equals(pktone.sliPictureId, - pkttwo.sliPictureId)) - return true; - break; - case 3: // Reference Picture Selection Indication - if (Arrays.equals(pktone.rpsiBitString, pkttwo.rpsiBitString)) - return true; - break; - case 15: // Application Layer Feedback Messages - // This will not work if the padding scheme is different - if (pktone.sliFirst.length == pkttwo.sliFirst.length - && Arrays.equals(pktone.alfBitString, - pkttwo.alfBitString)) - return true; - break; - default: - - } - return true; - } else { - System.out - .println("!!!! RTCPSession.equivalentPackets() encountered unexpected packet type!"); - } - return false; - } -}