src/jlibrtp/PktBuffer.java
changeset 823 2036ebfaccda
equal deleted inserted replaced
536:537ddd8aa407 823:2036ebfaccda
       
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import org.sipdroid.net.tools.DataFramePool;
       
    22 import org.sipdroid.net.tools.PktBufNodePool;
       
    23 
       
    24 /**
       
    25  * A PktBuffer stores packets either for buffering purposes, or because they
       
    26  * need to be assimilated to create a complete frame.
       
    27  * 
       
    28  * This behavior can be controlled through rtpSession.pktBufBehavior()
       
    29  * 
       
    30  * It optionally drops duplicate packets.
       
    31  * 
       
    32  * Note that newest is the most recently received, i.e. highest timeStamp Next
       
    33  * means new to old (from recently received to previously received)
       
    34  * 
       
    35  * @author Arne Kepp
       
    36  */
       
    37 public class PktBuffer {
       
    38 	/**
       
    39 	 * The RTPSession holds information common to all packetBuffers, such as max
       
    40 	 * size
       
    41 	 */
       
    42 	RTPSession rtpSession;
       
    43 	/** SSRC of the the participant that this buffer is for */
       
    44 	long SSRC;
       
    45 	/** The parent participant */
       
    46 	Participant p;
       
    47 	/** The length of the buffer */
       
    48 	int length = 0;
       
    49 	/** The oldest, least recently received, packet */
       
    50 	PktBufNode oldest = null;
       
    51 	/** The newest, most recently received, packet */
       
    52 	PktBufNode newest = null;
       
    53 
       
    54 	/** The last sequence number received */
       
    55 	int lastSeqNumber = -1;
       
    56 	/** The last timestamp */
       
    57 	long lastTimestamp = -1;
       
    58 
       
    59 	/**
       
    60 	 * Creates a new PktBuffer, a linked list of PktBufNode
       
    61 	 * 
       
    62 	 * @param rtpSession
       
    63 	 *            the parent RTPSession
       
    64 	 * @param p
       
    65 	 *            the participant to which this packetbuffer belongs.
       
    66 	 * @param aPkt
       
    67 	 *            The first RTP packet, to be added to the buffer
       
    68 	 */
       
    69 	protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
       
    70 		this.rtpSession = rtpSession;
       
    71 		this.p = p;
       
    72 		SSRC = aPkt.getSsrc();
       
    73 		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
       
    74 		newNode.initPktBufNode(aPkt);
       
    75 		oldest = newNode;
       
    76 		newest = newNode;
       
    77 		// lastSeqNumber = (aPkt.getSeqNumber() - 1);
       
    78 		// lastTimestamp = aPkt.getTimeStamp();
       
    79 		length = 1;
       
    80 	}
       
    81 
       
    82 	/**
       
    83 	 * Adds a packet, this happens in constant time if they arrive in order.
       
    84 	 * Optimized for the case where each pkt is a complete frame.
       
    85 	 * 
       
    86 	 * @param aPkt
       
    87 	 *            the packet to be added to the buffer.
       
    88 	 * @return integer, negative if operation failed (see code)
       
    89 	 */
       
    90 	protected synchronized int addPkt(RtpPkt aPkt) {
       
    91 		if (aPkt == null) {
       
    92 			System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
       
    93 			return -5;
       
    94 		}
       
    95 
       
    96 		long timeStamp = aPkt.getTimeStamp();
       
    97 		if (RTPSession.rtpDebugLevel > 7) {
       
    98 			System.out.println("-> PktBuffer.addPkt() , length:" + length
       
    99 					+ " , timeStamp of Pkt: " + Long.toString(timeStamp));
       
   100 		}
       
   101 
       
   102 		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
       
   103 		newNode.initPktBufNode(aPkt);
       
   104 		if (aPkt.getSsrc() != SSRC) {
       
   105 			System.out.println("PktBuffer.addPkt() SSRCs don't match!");
       
   106 		}
       
   107 
       
   108 		int retVal = 0;
       
   109 		if (this.rtpSession.pktBufBehavior > 0) {
       
   110 			retVal = bufferedAddPkt(newNode);
       
   111 		} else if (this.rtpSession.pktBufBehavior == 0) {
       
   112 			retVal = filteredAddPkt(newNode);
       
   113 		} else if (this.rtpSession.pktBufBehavior == -1) {
       
   114 			retVal = unfilteredAddPkt(newNode);
       
   115 		}
       
   116 
       
   117 		if (RTPSession.rtpDebugLevel > 7) {
       
   118 			if (RTPSession.rtpDebugLevel > 10) {
       
   119 				this.debugPrint();
       
   120 			}
       
   121 			System.out.println("<- PktBuffer.addPkt() , length:" + length
       
   122 					+ " returning " + retVal);
       
   123 		}
       
   124 		return retVal;
       
   125 	}
       
   126 
       
   127 	/**
       
   128 	 * Adds packets in the same order that they arrive, doesn't do any filering
       
   129 	 * or processing.
       
   130 	 * 
       
   131 	 * @param newNode
       
   132 	 *            the node to add to the packet buffer
       
   133 	 * @return 0 if everything is okay, -1 otherwise
       
   134 	 */
       
   135 	private int unfilteredAddPkt(PktBufNode newNode) {
       
   136 		if (RTPSession.rtpDebugLevel > 8) {
       
   137 			System.out.println("<->    PktBuffer.unfilteredAddPkt()");
       
   138 		}
       
   139 		// No magic, just add to the end
       
   140 		if (oldest != null) {
       
   141 			oldest.nextFrameQueueNode = newNode;
       
   142 			newNode.prevFrameQueueNode = oldest;
       
   143 			oldest = newNode;
       
   144 		} else {
       
   145 			oldest = newNode;
       
   146 			newest = newNode;
       
   147 		}
       
   148 		return 0;
       
   149 	}
       
   150 
       
   151 	/**
       
   152 	 * Takes care of duplicate packets
       
   153 	 * 
       
   154 	 * @param newNode
       
   155 	 *            the node to add to the packet buffer
       
   156 	 * @return 0 if everything is okay, -1 otherwise
       
   157 	 */
       
   158 	private int filteredAddPkt(PktBufNode newNode) {
       
   159 		if (RTPSession.rtpDebugLevel > 8) {
       
   160 			System.out.println("<->    PktBuffer.filteredAddPkt()");
       
   161 		}
       
   162 
       
   163 		if (length == 0) {
       
   164 			// The buffer was empty, this packet is the one and only.
       
   165 			newest = newNode;
       
   166 			oldest = newNode;
       
   167 			length = 1;
       
   168 		} else {
       
   169 			// The packetbuffer is not empty.
       
   170 			if (newNode.timeStamp > newest.timeStamp
       
   171 					|| newNode.seqNum > newest.seqNum
       
   172 					&& (newNode.seqNum - newest.seqNum) < 10) {
       
   173 				// Packet came in order
       
   174 				newNode.nextFrameQueueNode = newest;
       
   175 				newest.prevFrameQueueNode = newNode;
       
   176 				newest = newNode;
       
   177 				length++;
       
   178 			} else {
       
   179 				if (RTPSession.rtpDebugLevel > 2) {
       
   180 					System.out
       
   181 							.println("PktBuffer.filteredAddPkt Dropped a packet due to lag! "
       
   182 									+ newNode.timeStamp
       
   183 									+ " "
       
   184 									+ newNode.seqNum
       
   185 									+ " vs "
       
   186 									+ oldest.timeStamp
       
   187 									+ " "
       
   188 									+ oldest.seqNum);
       
   189 				}
       
   190 				return -1;
       
   191 			}
       
   192 		}
       
   193 
       
   194 		return 0;
       
   195 	}
       
   196 
       
   197 	/**
       
   198 	 * Does most of the packet organization for the application. Packets are put
       
   199 	 * in order, duplicate packets or late arrivals are discarded
       
   200 	 * 
       
   201 	 * If multiple packets make up a frame, these will also be organized by RTP
       
   202 	 * timestamp and sequence number, and returned as a complete frame.
       
   203 	 * 
       
   204 	 * @param newNode
       
   205 	 *            the node to add to the packet buffer
       
   206 	 * @return 0 if everything is okay, -1 otherwise
       
   207 	 */
       
   208 	private int bufferedAddPkt(PktBufNode newNode) {
       
   209 		if (RTPSession.rtpDebugLevel > 8) {
       
   210 			System.out.println("<->    PktBuffer.bufferedAddPkt()");
       
   211 		}
       
   212 		if (length == 0) {
       
   213 			// The buffer was empty, this packet is the one and only.
       
   214 			newest = newNode;
       
   215 			oldest = newNode;
       
   216 		} else {
       
   217 			// The packetbuffer is not empty.
       
   218 			if (newNode.timeStamp > newest.timeStamp
       
   219 					|| newNode.seqNum > newest.seqNum) {
       
   220 				// Packet came in order
       
   221 				newNode.nextFrameQueueNode = newest;
       
   222 				newest.prevFrameQueueNode = newNode;
       
   223 				newest = newNode;
       
   224 			} else {
       
   225 				// There are packets, we need to order this one right.
       
   226 				if (!pktOnTime(newNode.timeStamp, newNode.seqNum)
       
   227 						&& rtpSession.pktBufBehavior > -1) {
       
   228 					// We got this too late, can't put it in order anymore.
       
   229 					if (RTPSession.rtpDebugLevel > 2) {
       
   230 						System.out
       
   231 								.println("PktBuffer.addPkt Dropped a packet due to lag! "
       
   232 										+ newNode.timeStamp
       
   233 										+ " "
       
   234 										+ newNode.seqNum
       
   235 										+ " vs "
       
   236 										+ oldest.timeStamp
       
   237 										+ " "
       
   238 										+ oldest.seqNum);
       
   239 					}
       
   240 					return -1;
       
   241 				}
       
   242 
       
   243 				// Need to do some real work, find out where it belongs (linear
       
   244 				// search from the back).
       
   245 				PktBufNode tmpNode = newest;
       
   246 				while (tmpNode.timeStamp > newNode.timeStamp) {
       
   247 					tmpNode = tmpNode.nextFrameQueueNode;
       
   248 				}
       
   249 
       
   250 				if (tmpNode.timeStamp == newNode.timeStamp
       
   251 						&& rtpSession.frameReconstruction
       
   252 						&& newNode.seqNum != tmpNode.seqNum) {
       
   253 					// Packet has same timestamp, presumably belongs to frame.
       
   254 					// Need to order within frame.
       
   255 					if (RTPSession.rtpDebugLevel > 8) {
       
   256 						System.out
       
   257 								.println("Found pkt with existing timeStamp: "
       
   258 										+ newNode.timeStamp);
       
   259 					}
       
   260 					int ret = addToFrame(tmpNode, newNode);
       
   261 					if (ret != 0) {
       
   262 						return ret;
       
   263 					}
       
   264 				} else {
       
   265 
       
   266 					// Check that it's not a duplicate
       
   267 					if (tmpNode.timeStamp == newNode.timeStamp
       
   268 							&& newNode.seqNum == tmpNode.seqNum) {
       
   269 						if (RTPSession.rtpDebugLevel > 2) {
       
   270 							System.out
       
   271 									.println("PktBuffer.addPkt Dropped a duplicate packet! "
       
   272 											+ newNode.timeStamp
       
   273 											+ " "
       
   274 											+ newNode.seqNum);
       
   275 						}
       
   276 						return -1;
       
   277 					}
       
   278 
       
   279 					// Insert into buffer
       
   280 					newNode.nextFrameQueueNode = tmpNode;
       
   281 					newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;
       
   282 
       
   283 					// Update the node behind
       
   284 					if (newNode.prevFrameQueueNode != null) {
       
   285 						newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
       
   286 					}
       
   287 					tmpNode.prevFrameQueueNode = newNode;
       
   288 
       
   289 					if (newNode.timeStamp > newest.timeStamp) {
       
   290 						newest = newNode;
       
   291 					}
       
   292 				}
       
   293 			}
       
   294 		}
       
   295 		// Update the length of this buffer
       
   296 		length++;
       
   297 		return 0;
       
   298 	}
       
   299 
       
   300 	/**
       
   301 	 * 
       
   302 	 * @param frameNode
       
   303 	 *            the node currently representing the frame in the packet buffer
       
   304 	 * @param newNode
       
   305 	 *            the new node to be added to the frame
       
   306 	 * @return 0 if no error, -2 if this is a duplicate packet
       
   307 	 */
       
   308 	private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
       
   309 		// Node has same timeStamp, assume pkt belongs to frame
       
   310 
       
   311 		if (frameNode.seqNum < newNode.seqNum) {
       
   312 			// this is not the first packet in the frame
       
   313 			frameNode.pktCount++;
       
   314 
       
   315 			// Find the right spot
       
   316 			while (frameNode.nextFrameNode != null
       
   317 					&& frameNode.nextFrameNode.seqNum < newNode.seqNum) {
       
   318 				frameNode = frameNode.nextFrameNode;
       
   319 			}
       
   320 
       
   321 			// Check whether packet is duplicate
       
   322 			if (frameNode.nextFrameNode != null
       
   323 					&& frameNode.nextFrameNode.seqNum == newNode.seqNum) {
       
   324 				if (RTPSession.rtpDebugLevel > 2) {
       
   325 					System.out
       
   326 							.println("PktBuffer.addPkt Dropped a duplicate packet!");
       
   327 				}
       
   328 				return -2;
       
   329 			}
       
   330 
       
   331 			newNode.nextFrameNode = frameNode.nextFrameNode;
       
   332 			frameNode.nextFrameNode = newNode;
       
   333 
       
   334 		} else {
       
   335 			// newNode has the lowest sequence number
       
   336 			newNode.nextFrameNode = frameNode;
       
   337 			newNode.pktCount = frameNode.pktCount + 1;
       
   338 
       
   339 			// Update the queue
       
   340 			if (frameNode.nextFrameQueueNode != null) {
       
   341 				frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
       
   342 				newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
       
   343 				frameNode.nextFrameQueueNode = null;
       
   344 			}
       
   345 			if (frameNode.prevFrameQueueNode != null) {
       
   346 				frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
       
   347 				newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
       
   348 				frameNode.prevFrameQueueNode = null;
       
   349 			}
       
   350 			if (newest.timeStamp == newNode.timeStamp) {
       
   351 				newest = newNode;
       
   352 			}
       
   353 		}
       
   354 
       
   355 		return 0;
       
   356 	}
       
   357 
       
   358 	/**
       
   359 	 * Checks the oldest frame, if there is one, sees whether it is complete.
       
   360 	 * 
       
   361 	 * @return Returns null if there are no complete frames available.
       
   362 	 */
       
   363 	protected synchronized DataFrame popOldestFrame() {
       
   364 		if (RTPSession.rtpDebugLevel > 7) {
       
   365 			System.out.println("-> PktBuffer.popOldestFrame()");
       
   366 		}
       
   367 		if (RTPSession.rtpDebugLevel > 10) {
       
   368 			this.debugPrint();
       
   369 		}
       
   370 
       
   371 		if (this.rtpSession.pktBufBehavior > 0) {
       
   372 			return this.bufferedPopFrame();
       
   373 		} else {
       
   374 			return this.unbufferedPopFrame();
       
   375 		}
       
   376 	}
       
   377 
       
   378 	/**
       
   379 	 * Will return the oldest frame without checking whether it is in the right
       
   380 	 * order, or whether we should wate for late arrivals.
       
   381 	 * 
       
   382 	 * @return the first frame on the queue, null otherwise
       
   383 	 */
       
   384 	private DataFrame unbufferedPopFrame() {
       
   385 		if (oldest != null) {
       
   386 			PktBufNode retNode = oldest;
       
   387 
       
   388 			popFrameQueueCleanup(retNode, retNode.seqNum);
       
   389 			DataFrame df = DataFramePool.getInstance().borrowFrame();
       
   390 			df.initDataFrame(retNode, this.p, rtpSession.appIntf
       
   391 					.frameSize(oldest.pkt.getPayloadType()));
       
   392 			return df;
       
   393 		} else {
       
   394 			return null;
       
   395 		}
       
   396 	}
       
   397 
       
   398 	/**
       
   399 	 * Only returns if the buffer is full, i.e. length exceeds
       
   400 	 * rtpSession.pktBufBehavior, or if the next packet directly follows the
       
   401 	 * previous one returned to the application.
       
   402 	 * 
       
   403 	 * @return first frame in order, null otherwise
       
   404 	 */
       
   405 	private DataFrame bufferedPopFrame() {
       
   406 		PktBufNode retNode = oldest;
       
   407 		/**
       
   408 		 * Three scenarios: 1) There are no packets available 2) The first
       
   409 		 * packet is vailable and in order 3) The first packet is not the next
       
   410 		 * on in the sequence a) We have exceeded the wait buffer b) We wait
       
   411 		 */
       
   412 		// System.out.println(" Debug:" +(retNode != null) + " " +
       
   413 		// (retNode.seqNum == this.lastSeqNumber + 1)
       
   414 		// + " " + ( retNode.seqNum == 0 ) + " " + (this.length >
       
   415 		// this.rtpSession.maxReorderBuffer)
       
   416 		// + " " + (this.lastSeqNumber < 0));
       
   417 
       
   418 		// Pop it off, null all references.
       
   419 		if (retNode != null
       
   420 				&& (retNode.seqNum == this.lastSeqNumber + 1
       
   421 						|| retNode.seqNum == 0
       
   422 						|| this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {
       
   423 
       
   424 			// if(tmpNode.pktCount == compLen) {
       
   425 			if (RTPSession.rtpDebugLevel > 7) {
       
   426 				System.out
       
   427 						.println("<- PktBuffer.popOldestFrame() returns frame");
       
   428 			}
       
   429 
       
   430 			DataFrame df = DataFramePool.getInstance().borrowFrame();
       
   431 			df.initDataFrame(retNode, this.p, rtpSession.appIntf
       
   432 					.frameSize(oldest.pkt.getPayloadType()));
       
   433 
       
   434 			// DataFrame df = new DataFrame(retNode, this.p, 1);
       
   435 			popFrameQueueCleanup(retNode, df.lastSeqNum);
       
   436 
       
   437 			return df;
       
   438 
       
   439 		} else {
       
   440 			// If we get here we have little to show for.
       
   441 			if (RTPSession.rtpDebugLevel > 2) {
       
   442 				System.out
       
   443 						.println("<- PktBuffer.popOldestFrame() returns null "
       
   444 								+ retNode.seqNum + " " + this.lastSeqNumber);
       
   445 				this.debugPrint();
       
   446 			}
       
   447 			return null;
       
   448 		}
       
   449 	}
       
   450 
       
   451 	/**
       
   452 	 * Cleans the packet buffer before returning the frame, i.e. making sure the
       
   453 	 * queue has a head etc.
       
   454 	 * 
       
   455 	 * @param retNode
       
   456 	 *            the node that is about to be popped
       
   457 	 * @param highestSeq
       
   458 	 *            the highest sequence number returned to the application
       
   459 	 */
       
   460 	private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
       
   461 		if (1 == length) {
       
   462 			// There's only one frame
       
   463 			newest = null;
       
   464 			oldest = null;
       
   465 		} else {
       
   466 			// There are more frames
       
   467 			oldest = oldest.prevFrameQueueNode;
       
   468 			oldest.nextFrameQueueNode = null;
       
   469 		}
       
   470 
       
   471 		// Update counters
       
   472 		length--;
       
   473 
       
   474 		// Find the highest sequence number associated with this timestamp
       
   475 		this.lastSeqNumber = highestSeq;
       
   476 		this.lastTimestamp = retNode.timeStamp;
       
   477 	}
       
   478 
       
   479 	/**
       
   480 	 * Returns the length of the packetbuffer.
       
   481 	 * 
       
   482 	 * @return number of frames (complete or not) in packetbuffer.
       
   483 	 */
       
   484 	protected int getLength() {
       
   485 		return length;
       
   486 	}
       
   487 
       
   488 	/**
       
   489 	 * Checks whether a packet is not too late, i.e. the next packet has already
       
   490 	 * been returned.
       
   491 	 * 
       
   492 	 * @param timeStamp
       
   493 	 *            the RTP timestamp of the packet under consideration
       
   494 	 * @param seqNum
       
   495 	 *            the sequence number of the packet under consideration
       
   496 	 * @return true if newer packets have not been handed to the application
       
   497 	 */
       
   498 	protected boolean pktOnTime(long timeStamp, int seqNum) {
       
   499 		if (this.lastSeqNumber == -1) {
       
   500 			// First packet
       
   501 			return true;
       
   502 		} else {
       
   503 			if (seqNum >= this.lastSeqNumber) {
       
   504 				if (this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp) {
       
   505 					return false;
       
   506 				}
       
   507 			} else {
       
   508 				if (seqNum > 3 || timeStamp < this.lastTimestamp) {
       
   509 					return false;
       
   510 				}
       
   511 			}
       
   512 		}
       
   513 		return true;
       
   514 	}
       
   515 
       
   516 	/**
       
   517 	 * Prints out the packet buffer, oldest node first (on top).
       
   518 	 */
       
   519 	protected void debugPrint() {
       
   520 		System.out.println("PktBuffer.debugPrint() : length " + length
       
   521 				+ " SSRC " + SSRC + " lastSeqNum:" + lastSeqNumber);
       
   522 		PktBufNode tmpNode = oldest;
       
   523 		int i = 0;
       
   524 		while (tmpNode != null) {
       
   525 			// String str = tmpNode.timeStamp.toString();
       
   526 			System.out.println("   " + i + " seqNum:" + tmpNode.seqNum
       
   527 					+ " timeStamp: " + tmpNode.timeStamp + " pktCount:"
       
   528 					+ tmpNode.pktCount);
       
   529 			i++;
       
   530 			tmpNode = tmpNode.prevFrameQueueNode;
       
   531 		}
       
   532 	}
       
   533 }