src/jlibrtp/RTCPSession.java
changeset 13 e684f11070d5
equal deleted inserted replaced
12:c9ff263c29ad 13:e684f11070d5
       
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.util.Enumeration;
       
    22 import java.net.DatagramSocket;
       
    23 import java.net.InetAddress;
       
    24 import java.net.MulticastSocket;
       
    25 import java.util.LinkedList;
       
    26 import java.util.Hashtable;
       
    27 import java.util.ListIterator;
       
    28 import java.util.Arrays;
       
    29 
       
    30 
       
    31 /**
       
    32  * This class acts as an organizer for most of the information
       
    33  * and functions pertaining to RTCP packet generation and reception 
       
    34  * 
       
    35  * @author Arne Kepp
       
    36  *
       
    37  */
       
    38 public class RTCPSession {
       
    39 	/** Parent session */
       
    40 	protected RTPSession rtpSession = null;
       
    41 	
       
    42 	/** Unicast socket */
       
    43 	protected DatagramSocket rtcpSock = null;
       
    44 	/** Multicast socket */
       
    45 	protected MulticastSocket rtcpMCSock = null;
       
    46 	/** Multicast group */
       
    47 	protected InetAddress mcGroup = null;
       
    48 
       
    49 	/** RTCP Receiver thread */
       
    50 	protected RTCPReceiverThread recvThrd = null;
       
    51 	/** RTCP Sender thread */
       
    52 	protected RTCPSenderThread senderThrd = null;
       
    53 	
       
    54 	/** Previous time a delay was calculated */
       
    55 	protected long prevTime = System.currentTimeMillis();
       
    56 	/** Delay between RTCP transmissions, in ms. Initialized in start() */
       
    57 	protected int nextDelay = -1; //
       
    58 	/** The average compound RTCP packet size, in octets, including UDP and IP headers */
       
    59 	protected int avgPktSize = 200; //
       
    60 	/** Pessimistic case estimate of the current number of senders */
       
    61 	protected int senderCount = 1;
       
    62 	/** Whether next RTCP packet can be sent early */
       
    63 	protected boolean fbAllowEarly = false;
       
    64 	/** Feedback queue , index is SSRC of target */
       
    65 	protected Hashtable<Long, LinkedList<RtcpPkt>> fbQueue = null;
       
    66 	/** APP queue , index is SSRC of target */
       
    67 	protected Hashtable<Long, LinkedList<RtcpPktAPP>> appQueue = null;
       
    68 	/** Are we just starting up? */
       
    69 	protected boolean initial = true;
       
    70 	/** Is there a feedback packet waiting? SSRC of destination */
       
    71 	protected long fbWaiting = -1;
       
    72 
       
    73 	/**
       
    74 	 * Constructor for unicast sessions
       
    75 	 * 
       
    76 	 * @param parent RTPSession that started this
       
    77 	 * @param rtcpSocket the socket to use for listening and sending
       
    78 	 */
       
    79 	protected RTCPSession(RTPSession parent, DatagramSocket rtcpSocket) {
       
    80 		this.rtcpSock = rtcpSocket;
       
    81 		rtpSession = parent;
       
    82 	}
       
    83 
       
    84 	/**
       
    85 	 * Constructor for multicast sessions
       
    86 	 * 
       
    87 	 * @param parent parent RTPSession
       
    88 	 * @param rtcpSocket parent RTPSession that started this
       
    89 	 * @param multicastGroup multicast group to bind the socket to
       
    90 	 */
       
    91 	protected RTCPSession(RTPSession parent, MulticastSocket rtcpSocket, InetAddress multicastGroup) {
       
    92 		mcGroup = multicastGroup;
       
    93 		this.rtcpSock = rtcpSocket;
       
    94 		rtpSession = parent;
       
    95 	}
       
    96 
       
    97 	/**
       
    98 	 * Starts the session, calculates delays and fires up the threads.
       
    99 	 *
       
   100 	 */
       
   101 	protected void start() {
       
   102 		//nextDelay = 2500 + rtpSession.random.nextInt(1000) - 500;
       
   103 		this.calculateDelay();
       
   104 		recvThrd = new RTCPReceiverThread(this, this.rtpSession);
       
   105 		senderThrd = new RTCPSenderThread(this, this.rtpSession);
       
   106 		recvThrd.start();
       
   107 		senderThrd.start();
       
   108 	}
       
   109 
       
   110 	/**
       
   111 	 * Send bye packets, handled by RTCP Sender thread
       
   112 	 *
       
   113 	 */
       
   114 	protected void sendByes() {
       
   115 		senderThrd.sendByes();
       
   116 	}
       
   117 	
       
   118 	/**
       
   119 	 * Calculate the delay before the next RTCP packet can be sent
       
   120 	 *
       
   121 	 */
       
   122 	protected void calculateDelay() {
       
   123 		switch(rtpSession.rtcpMode) {
       
   124 		case 0: calculateRegularDelay(); break;
       
   125 		default:
       
   126 			System.out.println("RTCPSession.calculateDelay() unknown .mode");
       
   127 		}
       
   128 	}
       
   129 
       
   130 	/**
       
   131 	 * Calculates a delay value in accordance with RFC 3550
       
   132 	 *
       
   133 	 */
       
   134 	protected void calculateRegularDelay() {
       
   135 		long curTime = System.currentTimeMillis();
       
   136 		
       
   137 		if(rtpSession.bandwidth != 0 && ! this.initial && rtpSession.partDb.ssrcTable.size() > 4) {
       
   138 			// RTPs mechanisms for RTCP scalability
       
   139 			int rand = rtpSession.random.nextInt(10000) - 5000; //between -500 and +500
       
   140 			double randDouble =  ((double) 1000 + rand)/1000.0;
       
   141 			
       
   142 			
       
   143 			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
       
   144 			while(enu.hasMoreElements()) {
       
   145 				Participant part = enu.nextElement();
       
   146 				if(part.lastRtpPkt > this.prevTime)
       
   147 					senderCount++;
       
   148 			}
       
   149 			
       
   150 			double bw;
       
   151 			if(rtpSession.rtcpBandwidth > -1) {
       
   152 				bw = rtpSession.rtcpBandwidth;
       
   153 			}else {
       
   154 				bw = rtpSession.bandwidth*0.05;
       
   155 			}
       
   156 			if(senderCount*2 > rtpSession.partDb.ssrcTable.size()) {
       
   157 				if(rtpSession.lastTimestamp > this.prevTime) {
       
   158 					//We're a sender
       
   159 					double numerator = ((double) this.avgPktSize)*((double) senderCount);
       
   160 					double denominator = 0.25*bw;
       
   161 					this.nextDelay = (int) Math.round((numerator/denominator)*randDouble);
       
   162 				} else {
       
   163 					//We're a receiver
       
   164 					double numerator = ((double) this.avgPktSize)*((double) rtpSession.partDb.ssrcTable.size());
       
   165 					double denominator = 0.75*bw;
       
   166 					this.nextDelay = (int) Math.round((numerator/denominator)*randDouble);
       
   167 				}
       
   168 			} else {
       
   169 				double numerator = ((double) this.avgPktSize)*((double) rtpSession.partDb.ssrcTable.size());;
       
   170 				double denominator = bw;
       
   171 				this.nextDelay = (int) Math.round(1000.0*(numerator/denominator)) * (1000 + rand);
       
   172 			}
       
   173 		} else {
       
   174 			// Not enough data to scale, use random values
       
   175 			int rand = rtpSession.random.nextInt(1000) - 500; //between -500 and +500
       
   176 			if(this.initial) {
       
   177 				// 2.5 to 3.5 seconds, randomly
       
   178 				this.nextDelay = 3000 + rand;
       
   179 				this.initial = false;
       
   180 			} else {
       
   181 				// 4.5 to 5.5 seconds, randomly
       
   182 				this.nextDelay = 5500 + rand;
       
   183 			}
       
   184 
       
   185 		}
       
   186 		
       
   187 		// preflight check
       
   188 		if(this.nextDelay < 1000) {
       
   189 			int rand = rtpSession.random.nextInt(1000) - 500; //between -500 and +500
       
   190 			System.out.println("RTCPSession.calculateDelay() nextDelay was too short (" 
       
   191 					+this.nextDelay+"ms), setting to "+(this.nextDelay = 2000 + rand));
       
   192 		}
       
   193 		this.prevTime = curTime;
       
   194 	}
       
   195 
       
   196 	/**
       
   197 	 * Update the average packet size
       
   198 	 * @param length of latest packet
       
   199 	 */
       
   200 	synchronized protected void updateAvgPacket(int length) {
       
   201 		double tempAvg = (double) this.avgPktSize;
       
   202 		tempAvg = (15*tempAvg + ((double) length))/16;
       
   203 		this.avgPktSize = (int) tempAvg;
       
   204 	}
       
   205 	
       
   206 	
       
   207 	/**
       
   208 	 * Adds an RTCP APP (application) packet to the queue
       
   209 	 *
       
   210 	 * @param targetSsrc the SSRC of the recipient
       
   211 	 * @param aPkt 
       
   212 	 */
       
   213 	synchronized protected void addToAppQueue(long targetSsrc, RtcpPktAPP aPkt) {
       
   214 		aPkt.time = System.currentTimeMillis();
       
   215 		
       
   216 		if(this.appQueue == null)
       
   217 			this.appQueue = new Hashtable<Long, LinkedList<RtcpPktAPP>>();
       
   218 		
       
   219 		LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
       
   220 		if(ll == null) {
       
   221 			// No list, create and add
       
   222 			ll = new LinkedList<RtcpPktAPP>();
       
   223 			this.appQueue.put(targetSsrc, ll);
       
   224 		}
       
   225 		
       
   226 		ll.add(aPkt);
       
   227 	}
       
   228 	
       
   229 	
       
   230 	/**
       
   231 	 * Adds an RTCP APP (application) packet to the queue
       
   232 	 *
       
   233 	 * @param targetSsrc the SSRC of the recipient
       
   234 	 * @return array of RTCP Application packets 
       
   235 	 */
       
   236 	synchronized protected RtcpPktAPP[] getFromAppQueue(long targetSsrc) {		
       
   237 		if(this.appQueue == null)
       
   238 			return null;
       
   239 		
       
   240 		LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
       
   241 		if(ll == null || ll.isEmpty()) {
       
   242 			return null;
       
   243 		} else {
       
   244 			RtcpPktAPP[] ret = new RtcpPktAPP[ll.size()];
       
   245 			ListIterator<RtcpPktAPP> li = ll.listIterator();
       
   246 			int i = 0;
       
   247 			while(li.hasNext()) {
       
   248 				ret[i] = li.next();
       
   249 				i++;
       
   250 			}
       
   251 			return ret;
       
   252 		}
       
   253 	}
       
   254 	
       
   255 	
       
   256 	/**
       
   257 	 * Cleans the TCP APP (application) packet queues of any packets that are
       
   258 	 * too old, defined as 60 seconds since insertion.
       
   259 	 * 
       
   260 	 * @param ssrc The SSRC of the user who has left, negative value -> general cleanup
       
   261 	 */
       
   262 	synchronized protected void cleanAppQueue(long ssrc) {
       
   263 		if(this.appQueue == null)
       
   264 			return;
       
   265 		
       
   266 		if(ssrc > 0) {
       
   267 			this.appQueue.remove(ssrc);
       
   268 		} else {
       
   269 			Enumeration<LinkedList<RtcpPktAPP>> enu = this.appQueue.elements();
       
   270 			long curTime = System.currentTimeMillis();
       
   271 
       
   272 
       
   273 			while(enu.hasMoreElements()) {
       
   274 				ListIterator<RtcpPktAPP> li = enu.nextElement().listIterator();
       
   275 				while(li.hasNext()) {
       
   276 					RtcpPkt aPkt = li.next();
       
   277 					//Remove after 60 seconds
       
   278 					if(curTime - aPkt.time > 60000) {
       
   279 						li.remove();
       
   280 					}
       
   281 				}
       
   282 			}	
       
   283 		}
       
   284 	}
       
   285 	
       
   286 	
       
   287 	
       
   288 	/**
       
   289 	 * Check the feedback queue for similar packets and adds
       
   290 	 * the new packet if it is not redundant
       
   291 	 * 
       
   292 	 * @param aPkt
       
   293 	 * @return 0 if the packet was added, 1 if it was dropped
       
   294 	 */
       
   295 	synchronized protected int addToFbQueue(long targetSsrc, RtcpPkt aPkt) {
       
   296 		if(this.fbQueue == null)
       
   297 			this.fbQueue = new Hashtable<Long, LinkedList<RtcpPkt>>();
       
   298 		
       
   299 		LinkedList<RtcpPkt> ll = this.fbQueue.get(targetSsrc);
       
   300 		if(ll == null) {
       
   301 			// No list, create and add
       
   302 			ll = new LinkedList<RtcpPkt>();
       
   303 			ll.add(aPkt);
       
   304 			this.fbQueue.put(targetSsrc, ll);
       
   305 		} else {
       
   306 			// Check for matching packets, else add to end
       
   307 			ListIterator<RtcpPkt> li = ll.listIterator();
       
   308 			while(li.hasNext()) {
       
   309 				RtcpPkt tmp = li.next();
       
   310 				if(equivalent(tmp, aPkt))
       
   311 					return -1;
       
   312 			}
       
   313 			ll.addLast(aPkt);
       
   314 		}
       
   315 		return 0;
       
   316 	}
       
   317 		
       
   318 	/**
       
   319 	 * Checks whether there are ny feedback packets waiting
       
   320 	 * to be sent.
       
   321 	 * 
       
   322 	 * @param ssrc of the participant we are notifying
       
   323 	 * @return all relevant feedback packets, or null
       
   324 	 */
       
   325 	synchronized protected RtcpPkt[] getFromFbQueue(long ssrc) {
       
   326 		if(this.fbQueue == null)
       
   327 			return null;
       
   328 		
       
   329 		LinkedList<RtcpPkt> ll = this.fbQueue.get(ssrc);
       
   330 		
       
   331 		if(ll == null)
       
   332 			return null;
       
   333 		
       
   334 		ListIterator<RtcpPkt> li = ll.listIterator();
       
   335 		if(li.hasNext()) {
       
   336 			long curTime = System.currentTimeMillis();
       
   337 			long maxDelay = curTime - rtpSession.fbMaxDelay;
       
   338 			long keepDelay =  curTime - 2000;
       
   339 			int count = 0;
       
   340 			
       
   341 			//TODO below the indeces should be collected instead of looping twice
       
   342 			
       
   343 			// Clean out what we dont want and count what we want
       
   344 			while(li.hasNext()) {
       
   345 				RtcpPkt aPkt = li.next();
       
   346 				if(aPkt.received) {
       
   347 					//This is a packet received, we keep these for
       
   348 					// 2000ms to avoid redundant feedback
       
   349 					if(aPkt.time < keepDelay)
       
   350 						li.remove();
       
   351 				} else {
       
   352 					//This is a packet we havent sent yet
       
   353 					if(aPkt.time < maxDelay) {
       
   354 						li.remove();
       
   355 					} else {
       
   356 						count++;
       
   357 					}
       
   358 				}
       
   359 			}
       
   360 			
       
   361 			// Gather what we want to return
       
   362 			if(count != 0) {
       
   363 				li = ll.listIterator();
       
   364 				RtcpPkt[] ret = new RtcpPkt[count];
       
   365 		
       
   366 				while(count > 0) {
       
   367 					RtcpPkt aPkt = li.next();
       
   368 					if(! aPkt.received) {
       
   369 						ret[ret.length - count] = aPkt; 
       
   370 						count--;
       
   371 					}
       
   372 				}
       
   373 				return ret;
       
   374 			}
       
   375 		}
       
   376 		
       
   377 		return null;
       
   378 	}
       
   379 	
       
   380 	/**
       
   381 	 * Cleans the feeback queue of any packets that have expired,
       
   382 	 * ie feedback packet that are no longer relevant.
       
   383 	 * 
       
   384 	 * @param ssrc The SSRC of the user who has left, negative value -> general cleanup
       
   385 	 */
       
   386 	synchronized protected void cleanFbQueue(long ssrc) {
       
   387 		if(this.fbQueue == null)
       
   388 			return;
       
   389 		
       
   390 		if(ssrc > 0) {
       
   391 			this.fbQueue.remove(ssrc);
       
   392 		} else { 
       
   393 			Enumeration<LinkedList<RtcpPkt>> enu = this.fbQueue.elements();
       
   394 			long curTime = System.currentTimeMillis();
       
   395 			long maxDelay = curTime - rtpSession.fbMaxDelay;
       
   396 			long keepDelay =  curTime - 2000;
       
   397 
       
   398 			while(enu.hasMoreElements()) {
       
   399 				ListIterator<RtcpPkt> li = enu.nextElement().listIterator();
       
   400 				while(li.hasNext()) {
       
   401 					RtcpPkt aPkt = li.next();
       
   402 					if(aPkt.received) {
       
   403 						//This is a packet received, we keep these for
       
   404 						// 2000ms to avoid redundant feedback
       
   405 						if(aPkt.time < keepDelay)
       
   406 							li.remove();
       
   407 					} else {
       
   408 						//This is a packet we havent sent yet
       
   409 						if(aPkt.time < maxDelay)
       
   410 							li.remove();
       
   411 					}
       
   412 				}
       
   413 			}
       
   414 		}
       
   415 	}
       
   416 	
       
   417 	/**
       
   418 	 * Check whether the conditions are satisfied to send a feedbkac packet immediately.
       
   419 	 * 
       
   420 	 * @return true if they are, false otherwise
       
   421 	 */
       
   422 	protected boolean fbSendImmediately() {
       
   423 		if(rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbEarlyThreshold 
       
   424 				&& rtpSession.partDb.receivers.size() > this.rtpSession.fbEarlyThreshold)
       
   425 			return false;
       
   426 		
       
   427 		return true;
       
   428 	}
       
   429 	
       
   430 	
       
   431 	/**
       
   432 	 * Check whether the conditions are satisfied to send a feedbkac packet immediately.
       
   433 	 * 
       
   434 	 * @return true if they are, false otherwise
       
   435 	 */
       
   436 	protected boolean fbSendEarly() {
       
   437 		if(rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbRegularThreshold 
       
   438 				&& rtpSession.partDb.receivers.size() > this.rtpSession.fbRegularThreshold)
       
   439 			return false;
       
   440 		
       
   441 		return true;
       
   442 	}
       
   443 	
       
   444 	/**
       
   445 	 * Wake the sender thread because of this ssrc
       
   446 	 * 
       
   447 	 * @param ssrc that has feedback waiting.
       
   448 	 */
       
   449 	protected void wakeSenderThread(long ssrc) {
       
   450 		this.fbWaiting = ssrc;
       
   451 		this.senderThrd.interrupt();
       
   452 		
       
   453 		// Give it a chance to catch up
       
   454 		try { Thread.sleep(0,1); } catch (Exception e){ };
       
   455 	}
       
   456 	
       
   457 	/**
       
   458 	 * Compares two packets to check whether they are equivalent feedback messages,
       
   459 	 * to avoid sending the same feedback to a host twice.
       
   460 	 * 
       
   461 	 * Expect false negatives, but not false positives.
       
   462 	 * 
       
   463 	 * @param one packet
       
   464 	 * @param two packet
       
   465 	 * @return true if they are equivalent, false otherwise 
       
   466 	 */
       
   467 	private boolean equivalent(RtcpPkt one, RtcpPkt two) {
       
   468 		// Cheap checks
       
   469 		if(one.packetType != two.packetType)
       
   470 			return false;
       
   471 		
       
   472 		if(one.itemCount != two.itemCount)
       
   473 			return false;
       
   474 		
       
   475 		if(one.packetType == 205) {
       
   476 			// RTP Feedback, i.e. a NACK
       
   477 			RtcpPktRTPFB pktone = (RtcpPktRTPFB) one;
       
   478 			RtcpPktRTPFB pkttwo = (RtcpPktRTPFB) two;
       
   479 
       
   480 			if(pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
       
   481 				return false;
       
   482 			
       
   483 			if(Arrays.equals(pktone.BLP,pkttwo.BLP) 
       
   484 					&& Arrays.equals(pktone.BLP,pkttwo.BLP))
       
   485 				return true;
       
   486 			
       
   487 			return true;
       
   488 		} else if(one.packetType == 206) {
       
   489 			RtcpPktPSFB pktone = (RtcpPktPSFB) one;
       
   490 			RtcpPktPSFB pkttwo = (RtcpPktPSFB) two;
       
   491 
       
   492 			if(pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
       
   493 				return false;
       
   494 			
       
   495 			switch(one.itemCount) {
       
   496 			case 1: // Picture Loss Indication 
       
   497 				return true;
       
   498 				
       
   499 			case 2: // Slice Loss Indication
       
   500 				// This will not work if the slice loss indicators are in different order
       
   501 				if(pktone.sliFirst.length == pkttwo.sliFirst.length
       
   502 						&& Arrays.equals(pktone.sliFirst, pkttwo.sliFirst) 
       
   503 						&& Arrays.equals(pktone.sliNumber, pkttwo.sliNumber)
       
   504 						&& Arrays.equals(pktone.sliPictureId, pkttwo.sliPictureId))
       
   505 					return true;
       
   506 				break;
       
   507 			case 3: // Reference Picture Selection Indication 
       
   508 				if(Arrays.equals(pktone.rpsiBitString, pkttwo.rpsiBitString))
       
   509 					return true;
       
   510 				break;
       
   511 			case 15: // Application Layer Feedback Messages
       
   512 				// This will not work if the padding scheme is different
       
   513 				if(pktone.sliFirst.length == pkttwo.sliFirst.length
       
   514 						&& Arrays.equals(pktone.alfBitString, pkttwo.alfBitString))
       
   515 					return true;
       
   516 				break;
       
   517 			default:
       
   518 				
       
   519 			}
       
   520 			return true;
       
   521 		} else {
       
   522 			System.out.println("!!!! RTCPSession.equivalentPackets() encountered unexpected packet type!");
       
   523 		}
       
   524 		return false;
       
   525 	}
       
   526 }
       
   527