src/jlibrtp/RTCPSession.java
changeset 834 e8d6255306f8
parent 833 f5a5d9237d69
child 835 4e40f3481f23
equal deleted inserted replaced
833:f5a5d9237d69 834:e8d6255306f8
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.net.DatagramSocket;
       
    22 import java.net.InetAddress;
       
    23 import java.net.MulticastSocket;
       
    24 import java.util.Arrays;
       
    25 import java.util.Enumeration;
       
    26 import java.util.Hashtable;
       
    27 import java.util.LinkedList;
       
    28 import java.util.ListIterator;
       
    29 
       
    30 /**
       
    31  * This class acts as an organizer for most of the information and functions
       
    32  * pertaining to RTCP packet generation and reception
       
    33  * 
       
    34  * @author Arne Kepp
       
    35  * 
       
    36  */
       
    37 public class RTCPSession {
       
    38 	/** Parent session */
       
    39 	protected RTPSession rtpSession = null;
       
    40 
       
    41 	/** Unicast socket */
       
    42 	protected DatagramSocket rtcpSock = null;
       
    43 	/** Multicast socket */
       
    44 	protected MulticastSocket rtcpMCSock = null;
       
    45 	/** Multicast group */
       
    46 	protected InetAddress mcGroup = null;
       
    47 
       
    48 	/** RTCP Receiver thread */
       
    49 	protected RTCPReceiverThread recvThrd = null;
       
    50 	/** RTCP Sender thread */
       
    51 	protected RTCPSenderThread senderThrd = null;
       
    52 
       
    53 	/** Previous time a delay was calculated */
       
    54 	protected long prevTime = System.currentTimeMillis();
       
    55 	/** Delay between RTCP transmissions, in ms. Initialized in start() */
       
    56 	protected int nextDelay = -1; //
       
    57 	/**
       
    58 	 * The average compound RTCP packet size, in octets, including UDP and IP
       
    59 	 * headers
       
    60 	 */
       
    61 	protected int avgPktSize = 200; //
       
    62 	/** Pessimistic case estimate of the current number of senders */
       
    63 	protected int senderCount = 1;
       
    64 	/** Whether next RTCP packet can be sent early */
       
    65 	protected boolean fbAllowEarly = false;
       
    66 	/** Feedback queue , index is SSRC of target */
       
    67 	protected Hashtable<Long, LinkedList<RtcpPkt>> fbQueue = null;
       
    68 	/** APP queue , index is SSRC of target */
       
    69 	protected Hashtable<Long, LinkedList<RtcpPktAPP>> appQueue = null;
       
    70 	/** Are we just starting up? */
       
    71 	protected boolean initial = true;
       
    72 	/** Is there a feedback packet waiting? SSRC of destination */
       
    73 	protected long fbWaiting = -1;
       
    74 
       
    75 	/**
       
    76 	 * Constructor for unicast sessions
       
    77 	 * 
       
    78 	 * @param parent
       
    79 	 *            RTPSession that started this
       
    80 	 * @param rtcpSocket
       
    81 	 *            the socket to use for listening and sending
       
    82 	 */
       
    83 	protected RTCPSession(RTPSession parent, DatagramSocket rtcpSocket) {
       
    84 		this.rtcpSock = rtcpSocket;
       
    85 		rtpSession = parent;
       
    86 	}
       
    87 
       
    88 	/**
       
    89 	 * Constructor for multicast sessions
       
    90 	 * 
       
    91 	 * @param parent
       
    92 	 *            parent RTPSession
       
    93 	 * @param rtcpSocket
       
    94 	 *            parent RTPSession that started this
       
    95 	 * @param multicastGroup
       
    96 	 *            multicast group to bind the socket to
       
    97 	 */
       
    98 	protected RTCPSession(RTPSession parent, MulticastSocket rtcpSocket,
       
    99 			InetAddress multicastGroup) {
       
   100 		mcGroup = multicastGroup;
       
   101 		this.rtcpSock = rtcpSocket;
       
   102 		rtpSession = parent;
       
   103 	}
       
   104 
       
   105 	/**
       
   106 	 * Starts the session, calculates delays and fires up the threads.
       
   107 	 * 
       
   108 	 */
       
   109 	protected void start() {
       
   110 		// nextDelay = 2500 + rtpSession.random.nextInt(1000) - 500;
       
   111 		this.calculateDelay();
       
   112 		recvThrd = new RTCPReceiverThread(this, this.rtpSession);
       
   113 		senderThrd = new RTCPSenderThread(this, this.rtpSession);
       
   114 		recvThrd.start();
       
   115 		senderThrd.start();
       
   116 	}
       
   117 
       
   118 	/**
       
   119 	 * Send bye packets, handled by RTCP Sender thread
       
   120 	 * 
       
   121 	 */
       
   122 	protected void sendByes() {
       
   123 		senderThrd.sendByes();
       
   124 	}
       
   125 
       
   126 	/**
       
   127 	 * Calculate the delay before the next RTCP packet can be sent
       
   128 	 * 
       
   129 	 */
       
   130 	protected void calculateDelay() {
       
   131 		switch (rtpSession.rtcpMode) {
       
   132 		case 0:
       
   133 			calculateRegularDelay();
       
   134 			break;
       
   135 		default:
       
   136 			System.out.println("RTCPSession.calculateDelay() unknown .mode");
       
   137 		}
       
   138 	}
       
   139 
       
   140 	/**
       
   141 	 * Calculates a delay value in accordance with RFC 3550
       
   142 	 * 
       
   143 	 */
       
   144 	protected void calculateRegularDelay() {
       
   145 		long curTime = System.currentTimeMillis();
       
   146 
       
   147 		if (rtpSession.bandwidth != 0 && !this.initial
       
   148 				&& rtpSession.partDb.ssrcTable.size() > 4) {
       
   149 			// RTPs mechanisms for RTCP scalability
       
   150 			int rand = rtpSession.random.nextInt(10000) - 5000; // between -500
       
   151 																// and +500
       
   152 			double randDouble = ((double) 1000 + rand) / 1000.0;
       
   153 
       
   154 			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
       
   155 			while (enu.hasMoreElements()) {
       
   156 				Participant part = enu.nextElement();
       
   157 				if (part.lastRtpPkt > this.prevTime)
       
   158 					senderCount++;
       
   159 			}
       
   160 
       
   161 			double bw;
       
   162 			if (rtpSession.rtcpBandwidth > -1) {
       
   163 				bw = rtpSession.rtcpBandwidth;
       
   164 			} else {
       
   165 				bw = rtpSession.bandwidth * 0.05;
       
   166 			}
       
   167 			if (senderCount * 2 > rtpSession.partDb.ssrcTable.size()) {
       
   168 				if (rtpSession.lastTimestamp > this.prevTime) {
       
   169 					// We're a sender
       
   170 					double numerator = ((double) this.avgPktSize)
       
   171 							* ((double) senderCount);
       
   172 					double denominator = 0.25 * bw;
       
   173 					this.nextDelay = (int) Math.round((numerator / denominator)
       
   174 							* randDouble);
       
   175 				} else {
       
   176 					// We're a receiver
       
   177 					double numerator = ((double) this.avgPktSize)
       
   178 							* ((double) rtpSession.partDb.ssrcTable.size());
       
   179 					double denominator = 0.75 * bw;
       
   180 					this.nextDelay = (int) Math.round((numerator / denominator)
       
   181 							* randDouble);
       
   182 				}
       
   183 			} else {
       
   184 				double numerator = ((double) this.avgPktSize)
       
   185 						* ((double) rtpSession.partDb.ssrcTable.size());
       
   186 				;
       
   187 				double denominator = bw;
       
   188 				this.nextDelay = (int) Math
       
   189 						.round(1000.0 * (numerator / denominator))
       
   190 						* (1000 + rand);
       
   191 			}
       
   192 		} else {
       
   193 			// Not enough data to scale, use random values
       
   194 			int rand = rtpSession.random.nextInt(1000) - 500; // between -500
       
   195 																// and +500
       
   196 			if (this.initial) {
       
   197 				// 2.5 to 3.5 seconds, randomly
       
   198 				this.nextDelay = 3000 + rand;
       
   199 				this.initial = false;
       
   200 			} else {
       
   201 				// 4.5 to 5.5 seconds, randomly
       
   202 				this.nextDelay = 5500 + rand;
       
   203 			}
       
   204 
       
   205 		}
       
   206 
       
   207 		// preflight check
       
   208 		if (this.nextDelay < 1000) {
       
   209 			int rand = rtpSession.random.nextInt(1000) - 500; // between -500
       
   210 																// and +500
       
   211 			System.out
       
   212 					.println("RTCPSession.calculateDelay() nextDelay was too short ("
       
   213 							+ this.nextDelay
       
   214 							+ "ms), setting to "
       
   215 							+ (this.nextDelay = 2000 + rand));
       
   216 		}
       
   217 		this.prevTime = curTime;
       
   218 	}
       
   219 
       
   220 	/**
       
   221 	 * Update the average packet size
       
   222 	 * 
       
   223 	 * @param length
       
   224 	 *            of latest packet
       
   225 	 */
       
   226 	synchronized protected void updateAvgPacket(int length) {
       
   227 		double tempAvg = (double) this.avgPktSize;
       
   228 		tempAvg = (15 * tempAvg + ((double) length)) / 16;
       
   229 		this.avgPktSize = (int) tempAvg;
       
   230 	}
       
   231 
       
   232 	/**
       
   233 	 * Adds an RTCP APP (application) packet to the queue
       
   234 	 * 
       
   235 	 * @param targetSsrc
       
   236 	 *            the SSRC of the recipient
       
   237 	 * @param aPkt
       
   238 	 */
       
   239 	synchronized protected void addToAppQueue(long targetSsrc, RtcpPktAPP aPkt) {
       
   240 		aPkt.time = System.currentTimeMillis();
       
   241 
       
   242 		if (this.appQueue == null)
       
   243 			this.appQueue = new Hashtable<Long, LinkedList<RtcpPktAPP>>();
       
   244 
       
   245 		LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
       
   246 		if (ll == null) {
       
   247 			// No list, create and add
       
   248 			ll = new LinkedList<RtcpPktAPP>();
       
   249 			this.appQueue.put(targetSsrc, ll);
       
   250 		}
       
   251 
       
   252 		ll.add(aPkt);
       
   253 	}
       
   254 
       
   255 	/**
       
   256 	 * Adds an RTCP APP (application) packet to the queue
       
   257 	 * 
       
   258 	 * @param targetSsrc
       
   259 	 *            the SSRC of the recipient
       
   260 	 * @return array of RTCP Application packets
       
   261 	 */
       
   262 	synchronized protected RtcpPktAPP[] getFromAppQueue(long targetSsrc) {
       
   263 		if (this.appQueue == null)
       
   264 			return null;
       
   265 
       
   266 		LinkedList<RtcpPktAPP> ll = this.appQueue.get(targetSsrc);
       
   267 		if (ll == null || ll.isEmpty()) {
       
   268 			return null;
       
   269 		} else {
       
   270 			RtcpPktAPP[] ret = new RtcpPktAPP[ll.size()];
       
   271 			ListIterator<RtcpPktAPP> li = ll.listIterator();
       
   272 			int i = 0;
       
   273 			while (li.hasNext()) {
       
   274 				ret[i] = li.next();
       
   275 				i++;
       
   276 			}
       
   277 			return ret;
       
   278 		}
       
   279 	}
       
   280 
       
   281 	/**
       
   282 	 * Cleans the TCP APP (application) packet queues of any packets that are
       
   283 	 * too old, defined as 60 seconds since insertion.
       
   284 	 * 
       
   285 	 * @param ssrc
       
   286 	 *            The SSRC of the user who has left, negative value -> general
       
   287 	 *            cleanup
       
   288 	 */
       
   289 	synchronized protected void cleanAppQueue(long ssrc) {
       
   290 		if (this.appQueue == null)
       
   291 			return;
       
   292 
       
   293 		if (ssrc > 0) {
       
   294 			this.appQueue.remove(ssrc);
       
   295 		} else {
       
   296 			Enumeration<LinkedList<RtcpPktAPP>> enu = this.appQueue.elements();
       
   297 			long curTime = System.currentTimeMillis();
       
   298 
       
   299 			while (enu.hasMoreElements()) {
       
   300 				ListIterator<RtcpPktAPP> li = enu.nextElement().listIterator();
       
   301 				while (li.hasNext()) {
       
   302 					RtcpPkt aPkt = li.next();
       
   303 					// Remove after 60 seconds
       
   304 					if (curTime - aPkt.time > 60000) {
       
   305 						li.remove();
       
   306 					}
       
   307 				}
       
   308 			}
       
   309 		}
       
   310 	}
       
   311 
       
   312 	/**
       
   313 	 * Check the feedback queue for similar packets and adds the new packet if
       
   314 	 * it is not redundant
       
   315 	 * 
       
   316 	 * @param aPkt
       
   317 	 * @return 0 if the packet was added, 1 if it was dropped
       
   318 	 */
       
   319 	synchronized protected int addToFbQueue(long targetSsrc, RtcpPkt aPkt) {
       
   320 		if (this.fbQueue == null)
       
   321 			this.fbQueue = new Hashtable<Long, LinkedList<RtcpPkt>>();
       
   322 
       
   323 		LinkedList<RtcpPkt> ll = this.fbQueue.get(targetSsrc);
       
   324 		if (ll == null) {
       
   325 			// No list, create and add
       
   326 			ll = new LinkedList<RtcpPkt>();
       
   327 			ll.add(aPkt);
       
   328 			this.fbQueue.put(targetSsrc, ll);
       
   329 		} else {
       
   330 			// Check for matching packets, else add to end
       
   331 			ListIterator<RtcpPkt> li = ll.listIterator();
       
   332 			while (li.hasNext()) {
       
   333 				RtcpPkt tmp = li.next();
       
   334 				if (equivalent(tmp, aPkt))
       
   335 					return -1;
       
   336 			}
       
   337 			ll.addLast(aPkt);
       
   338 		}
       
   339 		return 0;
       
   340 	}
       
   341 
       
   342 	/**
       
   343 	 * Checks whether there are ny feedback packets waiting to be sent.
       
   344 	 * 
       
   345 	 * @param ssrc
       
   346 	 *            of the participant we are notifying
       
   347 	 * @return all relevant feedback packets, or null
       
   348 	 */
       
   349 	synchronized protected RtcpPkt[] getFromFbQueue(long ssrc) {
       
   350 		if (this.fbQueue == null)
       
   351 			return null;
       
   352 
       
   353 		LinkedList<RtcpPkt> ll = this.fbQueue.get(ssrc);
       
   354 
       
   355 		if (ll == null)
       
   356 			return null;
       
   357 
       
   358 		ListIterator<RtcpPkt> li = ll.listIterator();
       
   359 		if (li.hasNext()) {
       
   360 			long curTime = System.currentTimeMillis();
       
   361 			long maxDelay = curTime - rtpSession.fbMaxDelay;
       
   362 			long keepDelay = curTime - 2000;
       
   363 			int count = 0;
       
   364 
       
   365 			// TODO below the indeces should be collected instead of looping
       
   366 			// twice
       
   367 
       
   368 			// Clean out what we dont want and count what we want
       
   369 			while (li.hasNext()) {
       
   370 				RtcpPkt aPkt = li.next();
       
   371 				if (aPkt.received) {
       
   372 					// This is a packet received, we keep these for
       
   373 					// 2000ms to avoid redundant feedback
       
   374 					if (aPkt.time < keepDelay)
       
   375 						li.remove();
       
   376 				} else {
       
   377 					// This is a packet we havent sent yet
       
   378 					if (aPkt.time < maxDelay) {
       
   379 						li.remove();
       
   380 					} else {
       
   381 						count++;
       
   382 					}
       
   383 				}
       
   384 			}
       
   385 
       
   386 			// Gather what we want to return
       
   387 			if (count != 0) {
       
   388 				li = ll.listIterator();
       
   389 				RtcpPkt[] ret = new RtcpPkt[count];
       
   390 
       
   391 				while (count > 0) {
       
   392 					RtcpPkt aPkt = li.next();
       
   393 					if (!aPkt.received) {
       
   394 						ret[ret.length - count] = aPkt;
       
   395 						count--;
       
   396 					}
       
   397 				}
       
   398 				return ret;
       
   399 			}
       
   400 		}
       
   401 
       
   402 		return null;
       
   403 	}
       
   404 
       
   405 	/**
       
   406 	 * Cleans the feeback queue of any packets that have expired, ie feedback
       
   407 	 * packet that are no longer relevant.
       
   408 	 * 
       
   409 	 * @param ssrc
       
   410 	 *            The SSRC of the user who has left, negative value -> general
       
   411 	 *            cleanup
       
   412 	 */
       
   413 	synchronized protected void cleanFbQueue(long ssrc) {
       
   414 		if (this.fbQueue == null)
       
   415 			return;
       
   416 
       
   417 		if (ssrc > 0) {
       
   418 			this.fbQueue.remove(ssrc);
       
   419 		} else {
       
   420 			Enumeration<LinkedList<RtcpPkt>> enu = this.fbQueue.elements();
       
   421 			long curTime = System.currentTimeMillis();
       
   422 			long maxDelay = curTime - rtpSession.fbMaxDelay;
       
   423 			long keepDelay = curTime - 2000;
       
   424 
       
   425 			while (enu.hasMoreElements()) {
       
   426 				ListIterator<RtcpPkt> li = enu.nextElement().listIterator();
       
   427 				while (li.hasNext()) {
       
   428 					RtcpPkt aPkt = li.next();
       
   429 					if (aPkt.received) {
       
   430 						// This is a packet received, we keep these for
       
   431 						// 2000ms to avoid redundant feedback
       
   432 						if (aPkt.time < keepDelay)
       
   433 							li.remove();
       
   434 					} else {
       
   435 						// This is a packet we havent sent yet
       
   436 						if (aPkt.time < maxDelay)
       
   437 							li.remove();
       
   438 					}
       
   439 				}
       
   440 			}
       
   441 		}
       
   442 	}
       
   443 
       
   444 	/**
       
   445 	 * Check whether the conditions are satisfied to send a feedbkac packet
       
   446 	 * immediately.
       
   447 	 * 
       
   448 	 * @return true if they are, false otherwise
       
   449 	 */
       
   450 	protected boolean fbSendImmediately() {
       
   451 		if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbEarlyThreshold
       
   452 				&& rtpSession.partDb.receivers.size() > this.rtpSession.fbEarlyThreshold)
       
   453 			return false;
       
   454 
       
   455 		return true;
       
   456 	}
       
   457 
       
   458 	/**
       
   459 	 * Check whether the conditions are satisfied to send a feedbkac packet
       
   460 	 * immediately.
       
   461 	 * 
       
   462 	 * @return true if they are, false otherwise
       
   463 	 */
       
   464 	protected boolean fbSendEarly() {
       
   465 		if (rtpSession.partDb.ssrcTable.size() > this.rtpSession.fbRegularThreshold
       
   466 				&& rtpSession.partDb.receivers.size() > this.rtpSession.fbRegularThreshold)
       
   467 			return false;
       
   468 
       
   469 		return true;
       
   470 	}
       
   471 
       
   472 	/**
       
   473 	 * Wake the sender thread because of this ssrc
       
   474 	 * 
       
   475 	 * @param ssrc
       
   476 	 *            that has feedback waiting.
       
   477 	 */
       
   478 	protected void wakeSenderThread(long ssrc) {
       
   479 		this.fbWaiting = ssrc;
       
   480 		this.senderThrd.interrupt();
       
   481 
       
   482 		// Give it a chance to catch up
       
   483 		try {
       
   484 			Thread.sleep(0, 1);
       
   485 		} catch (Exception e) {
       
   486 		}
       
   487 		;
       
   488 	}
       
   489 
       
   490 	/**
       
   491 	 * Compares two packets to check whether they are equivalent feedback
       
   492 	 * messages, to avoid sending the same feedback to a host twice.
       
   493 	 * 
       
   494 	 * Expect false negatives, but not false positives.
       
   495 	 * 
       
   496 	 * @param one
       
   497 	 *            packet
       
   498 	 * @param two
       
   499 	 *            packet
       
   500 	 * @return true if they are equivalent, false otherwise
       
   501 	 */
       
   502 	private boolean equivalent(RtcpPkt one, RtcpPkt two) {
       
   503 		// Cheap checks
       
   504 		if (one.packetType != two.packetType)
       
   505 			return false;
       
   506 
       
   507 		if (one.itemCount != two.itemCount)
       
   508 			return false;
       
   509 
       
   510 		if (one.packetType == 205) {
       
   511 			// RTP Feedback, i.e. a NACK
       
   512 			RtcpPktRTPFB pktone = (RtcpPktRTPFB) one;
       
   513 			RtcpPktRTPFB pkttwo = (RtcpPktRTPFB) two;
       
   514 
       
   515 			if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
       
   516 				return false;
       
   517 
       
   518 			if (Arrays.equals(pktone.BLP, pkttwo.BLP)
       
   519 					&& Arrays.equals(pktone.BLP, pkttwo.BLP))
       
   520 				return true;
       
   521 
       
   522 			return true;
       
   523 		} else if (one.packetType == 206) {
       
   524 			RtcpPktPSFB pktone = (RtcpPktPSFB) one;
       
   525 			RtcpPktPSFB pkttwo = (RtcpPktPSFB) two;
       
   526 
       
   527 			if (pktone.ssrcMediaSource != pkttwo.ssrcMediaSource)
       
   528 				return false;
       
   529 
       
   530 			switch (one.itemCount) {
       
   531 			case 1: // Picture Loss Indication
       
   532 				return true;
       
   533 
       
   534 			case 2: // Slice Loss Indication
       
   535 				// This will not work if the slice loss indicators are in
       
   536 				// different order
       
   537 				if (pktone.sliFirst.length == pkttwo.sliFirst.length
       
   538 						&& Arrays.equals(pktone.sliFirst, pkttwo.sliFirst)
       
   539 						&& Arrays.equals(pktone.sliNumber, pkttwo.sliNumber)
       
   540 						&& Arrays.equals(pktone.sliPictureId,
       
   541 								pkttwo.sliPictureId))
       
   542 					return true;
       
   543 				break;
       
   544 			case 3: // Reference Picture Selection Indication
       
   545 				if (Arrays.equals(pktone.rpsiBitString, pkttwo.rpsiBitString))
       
   546 					return true;
       
   547 				break;
       
   548 			case 15: // Application Layer Feedback Messages
       
   549 				// This will not work if the padding scheme is different
       
   550 				if (pktone.sliFirst.length == pkttwo.sliFirst.length
       
   551 						&& Arrays.equals(pktone.alfBitString,
       
   552 								pkttwo.alfBitString))
       
   553 					return true;
       
   554 				break;
       
   555 			default:
       
   556 
       
   557 			}
       
   558 			return true;
       
   559 		} else {
       
   560 			System.out
       
   561 					.println("!!!! RTCPSession.equivalentPackets() encountered unexpected packet type!");
       
   562 		}
       
   563 		return false;
       
   564 	}
       
   565 }