diff -r c9ff263c29ad -r e684f11070d5 src/jlibrtp/RTCPSession.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jlibrtp/RTCPSession.java Sat Mar 14 22:15:41 2009 +0100 @@ -0,0 +1,527 @@ +/** + * Java RTP Library (jlibrtp) + * Copyright (C) 2006 Arne Kepp + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ +package jlibrtp; + +import java.util.Enumeration; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.MulticastSocket; +import java.util.LinkedList; +import java.util.Hashtable; +import java.util.ListIterator; +import java.util.Arrays; + + +/** + * This class acts as an organizer for most of the information + * and functions pertaining to RTCP packet generation and reception + * + * @author Arne Kepp + * + */ +public class RTCPSession { + /** Parent session */ + protected RTPSession rtpSession = null; + + /** Unicast socket */ + protected DatagramSocket rtcpSock = null; + /** Multicast socket */ + protected MulticastSocket rtcpMCSock = null; + /** Multicast group */ + protected InetAddress mcGroup = null; + + /** RTCP Receiver thread */ + protected RTCPReceiverThread recvThrd = null; + /** RTCP Sender thread */ + protected RTCPSenderThread senderThrd = null; + + /** Previous time a delay was calculated */ + protected long prevTime = System.currentTimeMillis(); + /** Delay between RTCP transmissions, in ms. Initialized in start() */ + protected int nextDelay = -1; // + /** The average compound RTCP packet size, in octets, including UDP and IP headers */ + protected int avgPktSize = 200; // + /** Pessimistic case estimate of the current number of senders */ + protected int senderCount = 1; + /** Whether next RTCP packet can be sent early */ + protected boolean fbAllowEarly = false; + /** Feedback queue , index is SSRC of target */ + protected Hashtable> fbQueue = null; + /** APP queue , index is SSRC of target */ + protected Hashtable> appQueue = null; + /** Are we just starting up? */ + protected boolean initial = true; + /** Is there a feedback packet waiting? SSRC of destination */ + protected long fbWaiting = -1; + + /** + * Constructor for unicast sessions + * + * @param parent RTPSession that started this + * @param rtcpSocket the socket to use for listening and sending + */ + protected RTCPSession(RTPSession parent, DatagramSocket rtcpSocket) { + this.rtcpSock = rtcpSocket; + rtpSession = parent; + } + + /** + * Constructor for multicast sessions + * + * @param parent parent RTPSession + * @param rtcpSocket parent RTPSession that started this + * @param multicastGroup multicast group to bind the socket to + */ + protected RTCPSession(RTPSession parent, MulticastSocket rtcpSocket, InetAddress multicastGroup) { + mcGroup = multicastGroup; + this.rtcpSock = rtcpSocket; + rtpSession = parent; + } + + /** + * Starts the session, calculates delays and fires up the threads. + * + */ + protected void start() { + //nextDelay = 2500 + rtpSession.random.nextInt(1000) - 500; + this.calculateDelay(); + recvThrd = new RTCPReceiverThread(this, this.rtpSession); + senderThrd = new RTCPSenderThread(this, this.rtpSession); + recvThrd.start(); + senderThrd.start(); + } + + /** + * Send bye packets, handled by RTCP Sender thread + * + */ + protected void sendByes() { + senderThrd.sendByes(); + } + + /** + * Calculate the delay before the next RTCP packet can be sent + * + */ + protected void calculateDelay() { + switch(rtpSession.rtcpMode) { + case 0: calculateRegularDelay(); break; + default: + System.out.println("RTCPSession.calculateDelay() unknown .mode"); + } + } + + /** + * Calculates a delay value in accordance with RFC 3550 + * + */ + protected void calculateRegularDelay() { + long curTime = System.currentTimeMillis(); + + if(rtpSession.bandwidth != 0 && ! this.initial && rtpSession.partDb.ssrcTable.size() > 4) { + // RTPs mechanisms for RTCP scalability + int rand = rtpSession.random.nextInt(10000) - 5000; //between -500 and +500 + double randDouble = ((double) 1000 + rand)/1000.0; + + + Enumeration enu = rtpSession.partDb.getParticipants(); + while(enu.hasMoreElements()) { + Participant part = enu.nextElement(); + if(part.lastRtpPkt > this.prevTime) + senderCount++; + } + + double bw; + if(rtpSession.rtcpBandwidth > -1) { + bw = rtpSession.rtcpBandwidth; + }else { + bw = rtpSession.bandwidth*0.05; + } + if(senderCount*2 > rtpSession.partDb.ssrcTable.size()) { + if(rtpSession.lastTimestamp > this.prevTime) { + //We're a sender + double numerator = ((double) this.avgPktSize)*((double) senderCount); + double denominator = 0.25*bw; + this.nextDelay = (int) Math.round((numerator/denominator)*randDouble); + } else { + //We're a receiver + double numerator = ((double) this.avgPktSize)*((double) rtpSession.partDb.ssrcTable.size()); + double denominator = 0.75*bw; + this.nextDelay = (int) Math.round((numerator/denominator)*randDouble); + } + } else { + double numerator = ((double) this.avgPktSize)*((double) rtpSession.partDb.ssrcTable.size());; + double denominator = bw; + this.nextDelay = (int) Math.round(1000.0*(numerator/denominator)) * (1000 + rand); + } + } else { + // Not enough data to scale, use random values + int rand = rtpSession.random.nextInt(1000) - 500; //between -500 and +500 + if(this.initial) { + // 2.5 to 3.5 seconds, randomly + this.nextDelay = 3000 + rand; + this.initial = false; + } else { + // 4.5 to 5.5 seconds, randomly + this.nextDelay = 5500 + rand; + } + + } + + // preflight check + if(this.nextDelay < 1000) { + int rand = rtpSession.random.nextInt(1000) - 500; //between -500 and +500 + System.out.println("RTCPSession.calculateDelay() nextDelay was too short (" + +this.nextDelay+"ms), setting to "+(this.nextDelay = 2000 + rand)); + } + this.prevTime = curTime; + } + + /** + * Update the average packet size + * @param length of latest packet + */ + synchronized protected void updateAvgPacket(int length) { + double tempAvg = (double) this.avgPktSize; + tempAvg = (15*tempAvg + ((double) length))/16; + this.avgPktSize = (int) tempAvg; + } + + + /** + * Adds an RTCP APP (application) packet to the queue + * + * @param targetSsrc the SSRC of the recipient + * @param aPkt + */ + synchronized protected void addToAppQueue(long targetSsrc, RtcpPktAPP aPkt) { + aPkt.time = System.currentTimeMillis(); + + if(this.appQueue == null) + this.appQueue = new Hashtable>(); + + LinkedList ll = this.appQueue.get(targetSsrc); + if(ll == null) { + // No list, create and add + ll = new LinkedList(); + this.appQueue.put(targetSsrc, ll); + } + + ll.add(aPkt); + } + + + /** + * Adds an RTCP APP (application) packet to the queue + * + * @param targetSsrc the SSRC of the recipient + * @return array of RTCP Application packets + */ + synchronized protected RtcpPktAPP[] getFromAppQueue(long targetSsrc) { + if(this.appQueue == null) + return null; + + LinkedList ll = this.appQueue.get(targetSsrc); + if(ll == null || ll.isEmpty()) { + return null; + } else { + RtcpPktAPP[] ret = new RtcpPktAPP[ll.size()]; + ListIterator li = ll.listIterator(); + int i = 0; + while(li.hasNext()) { + ret[i] = li.next(); + i++; + } + return ret; + } + } + + + /** + * Cleans the TCP APP (application) packet queues of any packets that are + * too old, defined as 60 seconds since insertion. + * + * @param ssrc The SSRC of the user who has left, negative value -> general cleanup + */ + synchronized protected void cleanAppQueue(long ssrc) { + if(this.appQueue == null) + return; + + if(ssrc > 0) { + this.appQueue.remove(ssrc); + } else { + Enumeration> enu = this.appQueue.elements(); + long curTime = System.currentTimeMillis(); + + + while(enu.hasMoreElements()) { + ListIterator li = enu.nextElement().listIterator(); + while(li.hasNext()) { + RtcpPkt aPkt = li.next(); + //Remove after 60 seconds + if(curTime - aPkt.time > 60000) { + li.remove(); + } + } + } + } + } + + + + /** + * Check the feedback queue for similar packets and adds + * the new packet if it is not redundant + * + * @param aPkt + * @return 0 if the packet was added, 1 if it was dropped + */ + synchronized protected int addToFbQueue(long targetSsrc, RtcpPkt aPkt) { + if(this.fbQueue == null) + this.fbQueue = new Hashtable>(); + + LinkedList ll = this.fbQueue.get(targetSsrc); + if(ll == null) { + // No list, create and add + ll = new LinkedList(); + ll.add(aPkt); + this.fbQueue.put(targetSsrc, ll); + } else { + // Check for matching packets, else add to end + ListIterator li = ll.listIterator(); + while(li.hasNext()) { + RtcpPkt tmp = li.next(); + if(equivalent(tmp, aPkt)) + return -1; + } + ll.addLast(aPkt); + } + return 0; + } + + /** + * Checks whether there are ny feedback packets waiting + * to be sent. + * + * @param ssrc of the participant we are notifying + * @return all relevant feedback packets, or null + */ + synchronized protected RtcpPkt[] getFromFbQueue(long ssrc) { + if(this.fbQueue == null) + return null; + + LinkedList ll = this.fbQueue.get(ssrc); + + if(ll == null) + return null; + + ListIterator li = ll.listIterator(); + if(li.hasNext()) { + long curTime = System.currentTimeMillis(); + long maxDelay = curTime - rtpSession.fbMaxDelay; + long keepDelay = curTime - 2000; + int count = 0; + + //TODO below the indeces should be collected instead of looping twice + + // Clean out what we dont want and count what we want + while(li.hasNext()) { + RtcpPkt aPkt = li.next(); + if(aPkt.received) { + //This is a packet received, we keep these for + // 2000ms to avoid redundant feedback + if(aPkt.time < keepDelay) + li.remove(); + } else { + //This is a packet we havent sent yet + if(aPkt.time < maxDelay) { + li.remove(); + } else { + count++; + } + } + } + + // Gather what we want to return + if(count != 0) { + li = ll.listIterator(); + RtcpPkt[] ret = new RtcpPkt[count]; + + while(count > 0) { + RtcpPkt aPkt = li.next(); + if(! aPkt.received) { + ret[ret.length - count] = aPkt; + count--; + } + } + return ret; + } + } + + return null; + } + + /** + * Cleans the feeback queue of any packets that have expired, + * ie feedback packet that are no longer relevant. + * + * @param ssrc The SSRC of the user who has left, negative value -> general cleanup + */ + synchronized protected void cleanFbQueue(long ssrc) { + if(this.fbQueue == null) + return; + + if(ssrc > 0) { + this.fbQueue.remove(ssrc); + } else { + Enumeration> enu = this.fbQueue.elements(); + long curTime = System.currentTimeMillis(); + long maxDelay = curTime - rtpSession.fbMaxDelay; + long keepDelay = curTime - 2000; + + while(enu.hasMoreElements()) { + ListIterator li = enu.nextElement().listIterator(); + while(li.hasNext()) { + RtcpPkt aPkt = li.next(); + if(aPkt.received) { + //This is a packet received, we keep these for + // 2000ms to avoid redundant feedback + if(aPkt.time < keepDelay) + li.remove(); + } else { + //This is a packet we havent sent yet + if(aPkt.time < maxDelay) + li.remove(); + } + } + } + } + } + + /** + * Check whether the conditions are satisfied to send a feedbkac packet immediately. + * + * @return true if they are, false otherwise + */ + protected boolean fbSendImmediately() { + if(rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbEarlyThreshold + && rtpSession.partDb.receivers.size() > this.rtpSession.fbEarlyThreshold) + return false; + + return true; + } + + + /** + * Check whether the conditions are satisfied to send a feedbkac packet immediately. + * + * @return true if they are, false otherwise + */ + protected boolean fbSendEarly() { + if(rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbRegularThreshold + && rtpSession.partDb.receivers.size() > this.rtpSession.fbRegularThreshold) + return false; + + return true; + } + + /** + * Wake the sender thread because of this ssrc + * + * @param ssrc that has feedback waiting. + */ + protected void wakeSenderThread(long ssrc) { + this.fbWaiting = ssrc; + this.senderThrd.interrupt(); + + // Give it a chance to catch up + try { Thread.sleep(0,1); } catch (Exception e){ }; + } + + /** + * Compares two packets to check whether they are equivalent feedback messages, + * to avoid sending the same feedback to a host twice. + * + * Expect false negatives, but not false positives. + * + * @param one packet + * @param two packet + * @return true if they are equivalent, false otherwise + */ + private boolean equivalent(RtcpPkt one, RtcpPkt two) { + // Cheap checks + if(one.packetType != two.packetType) + return false; + + if(one.itemCount != two.itemCount) + return false; + + if(one.packetType == 205) { + // RTP Feedback, i.e. a NACK + RtcpPktRTPFB pktone = (RtcpPktRTPFB) one; + RtcpPktRTPFB pkttwo = (RtcpPktRTPFB) two; + + if(pktone.ssrcMediaSource != pkttwo.ssrcMediaSource) + return false; + + if(Arrays.equals(pktone.BLP,pkttwo.BLP) + && Arrays.equals(pktone.BLP,pkttwo.BLP)) + return true; + + return true; + } else if(one.packetType == 206) { + RtcpPktPSFB pktone = (RtcpPktPSFB) one; + RtcpPktPSFB pkttwo = (RtcpPktPSFB) two; + + if(pktone.ssrcMediaSource != pkttwo.ssrcMediaSource) + return false; + + switch(one.itemCount) { + case 1: // Picture Loss Indication + return true; + + case 2: // Slice Loss Indication + // This will not work if the slice loss indicators are in different order + if(pktone.sliFirst.length == pkttwo.sliFirst.length + && Arrays.equals(pktone.sliFirst, pkttwo.sliFirst) + && Arrays.equals(pktone.sliNumber, pkttwo.sliNumber) + && Arrays.equals(pktone.sliPictureId, pkttwo.sliPictureId)) + return true; + break; + case 3: // Reference Picture Selection Indication + if(Arrays.equals(pktone.rpsiBitString, pkttwo.rpsiBitString)) + return true; + break; + case 15: // Application Layer Feedback Messages + // This will not work if the padding scheme is different + if(pktone.sliFirst.length == pkttwo.sliFirst.length + && Arrays.equals(pktone.alfBitString, pkttwo.alfBitString)) + return true; + break; + default: + + } + return true; + } else { + System.out.println("!!!! RTCPSession.equivalentPackets() encountered unexpected packet type!"); + } + return false; + } +} +