src/jlibrtp/PktBuffer.java
changeset 214 2bf440c54ca5
parent 213 9bdff6cbd120
child 215 5db64229be69
equal deleted inserted replaced
213:9bdff6cbd120 214:2bf440c54ca5
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 /**
       
    22  * A PktBuffer stores packets either for buffering purposes,
       
    23  * or because they need to be assimilated to create a complete frame.
       
    24  * 
       
    25  * This behavior can be controlled through rtpSession.pktBufBehavior()
       
    26  * 
       
    27  * It optionally drops duplicate packets.
       
    28  * 
       
    29  * Note that newest is the most recently received, i.e. highest timeStamp
       
    30  * Next means new to old (from recently received to previously received) 
       
    31  * 
       
    32  * @author Arne Kepp
       
    33  */
       
    34 public class PktBuffer {
       
    35 	/** The RTPSession holds information common to all packetBuffers, such as max size */
       
    36 	RTPSession rtpSession;
       
    37 	/** SSRC of the the participant that this buffer is for */
       
    38 	long SSRC;
       
    39 	/** The parent participant */
       
    40 	Participant p;
       
    41 	/** The length of the buffer */
       
    42 	int length = 0;
       
    43 	/** The oldest, least recently received, packet */
       
    44 	PktBufNode oldest = null;
       
    45 	/** The newest, most recently received, packet */
       
    46 	PktBufNode newest = null;
       
    47 	
       
    48 	/** The last sequence number received */
       
    49 	int lastSeqNumber = -1;
       
    50 	/** The last timestamp */
       
    51 	long lastTimestamp = -1;
       
    52 
       
    53 	/** 
       
    54 	 * Creates a new PktBuffer, a linked list of PktBufNode
       
    55 	 * 
       
    56 	 * @param rtpSession the parent RTPSession
       
    57 	 * @param p the participant to which this packetbuffer belongs.
       
    58 	 * @param aPkt The first RTP packet, to be added to the buffer 
       
    59 	 */
       
    60 	protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
       
    61 		this.rtpSession = rtpSession;
       
    62 		this.p = p;
       
    63 		SSRC = aPkt.getSsrc();
       
    64 		PktBufNode newNode = new PktBufNode(aPkt);
       
    65 		oldest = newNode;
       
    66 		newest = newNode;
       
    67 		//lastSeqNumber = (aPkt.getSeqNumber() - 1);
       
    68 		//lastTimestamp = aPkt.getTimeStamp();
       
    69 		length = 1;
       
    70 	}
       
    71 
       
    72 	/**
       
    73 	 * Adds a packet, this happens in constant time if they arrive in order.
       
    74 	 * Optimized for the case where each pkt is a complete frame.
       
    75 	 * 
       
    76 	 * @param aPkt the packet to be added to the buffer.
       
    77 	 * @return integer, negative if operation failed (see code)
       
    78 	 */
       
    79 	protected synchronized int addPkt(RtpPkt aPkt) {
       
    80 		if(aPkt == null) {
       
    81 			System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
       
    82 			return -5;
       
    83 		}
       
    84 
       
    85 		long timeStamp = aPkt.getTimeStamp();
       
    86 		if(RTPSession.rtpDebugLevel > 7) {
       
    87 			System.out.println("-> PktBuffer.addPkt() , length:" + length + " , timeStamp of Pkt: " + Long.toString(timeStamp));
       
    88 		}
       
    89 
       
    90 		
       
    91 		PktBufNode newNode = new PktBufNode(aPkt);
       
    92 		if(aPkt.getSsrc() != SSRC) {
       
    93 			System.out.println("PktBuffer.addPkt() SSRCs don't match!");
       
    94 		}
       
    95 		
       
    96 		int retVal = 0;
       
    97 		if(this.rtpSession.pktBufBehavior > 0) {
       
    98 			retVal = bufferedAddPkt(newNode);
       
    99 		} else if(this.rtpSession.pktBufBehavior == 0) {
       
   100 			retVal = filteredAddPkt(newNode);
       
   101 		} else if(this.rtpSession.pktBufBehavior == -1) {
       
   102 			retVal = unfilteredAddPkt(newNode);
       
   103 		}
       
   104 		
       
   105 
       
   106 		if(RTPSession.rtpDebugLevel > 7) {
       
   107 			if(RTPSession.rtpDebugLevel > 10) {
       
   108 				this.debugPrint();
       
   109 			}
       
   110 			System.out.println("<- PktBuffer.addPkt() , length:" + length + " returning " + retVal);
       
   111 		}
       
   112 		return retVal;
       
   113 	}
       
   114 	
       
   115 	/**
       
   116 	 * Adds packets in the same order that they arrive,
       
   117 	 * doesn't do any filering or processing.
       
   118 	 * 
       
   119 	 * @param newNode the node to add to the packet buffer
       
   120 	 * @return 0 if everything is okay, -1 otherwise
       
   121 	 */
       
   122 	private int unfilteredAddPkt(PktBufNode newNode) {
       
   123 		if(RTPSession.rtpDebugLevel > 8) {
       
   124 			System.out.println("<->    PktBuffer.unfilteredAddPkt()");
       
   125 		}
       
   126 		//No magic, just add to the end
       
   127 		if(oldest != null) {
       
   128 			oldest.nextFrameQueueNode = newNode;
       
   129 			newNode.prevFrameQueueNode = oldest; 
       
   130 			oldest = newNode;
       
   131 		} else {
       
   132 			oldest = newNode;
       
   133 			newest = newNode;
       
   134 		}
       
   135 		return 0;
       
   136 	}
       
   137 	
       
   138 	/**
       
   139 	 * Takes care of duplicate packets
       
   140 	 * 
       
   141 	 * @param newNode the node to add to the packet buffer
       
   142 	 * @return 0 if everything is okay, -1 otherwise
       
   143 	 */
       
   144 	private int filteredAddPkt(PktBufNode newNode) {
       
   145 		if(RTPSession.rtpDebugLevel > 8) {
       
   146 			System.out.println("<->    PktBuffer.filteredAddPkt()");
       
   147 		}
       
   148 		
       
   149 		if(length == 0) {
       
   150 			// The buffer was empty, this packet is the one and only.
       
   151 			newest = newNode;
       
   152 			oldest = newNode;
       
   153 			length = 1;
       
   154 		} else {
       
   155 			// The packetbuffer is not empty.
       
   156 			if(newNode.timeStamp > newest.timeStamp || newNode.seqNum > newest.seqNum && (newNode.seqNum - newest.seqNum) < 10) {
       
   157 				// Packet came in order
       
   158 				newNode.nextFrameQueueNode = newest;
       
   159 				newest.prevFrameQueueNode = newNode;
       
   160 				newest = newNode;
       
   161 				length++;
       
   162 			} else {
       
   163 					if(RTPSession.rtpDebugLevel > 2) {
       
   164 						System.out.println("PktBuffer.filteredAddPkt Dropped a packet due to lag! " +  newNode.timeStamp + " " 
       
   165 								+ newNode.seqNum + " vs "+ oldest.timeStamp + " " + oldest.seqNum);
       
   166 					}
       
   167 					return -1;
       
   168 			}
       
   169 		}
       
   170 		
       
   171 		return 0;
       
   172 	}
       
   173 	
       
   174 	/**
       
   175 	 * Does most of the packet organization for the application.
       
   176 	 * Packets are put in order, duplicate packets or late arrivals are discarded
       
   177 	 * 
       
   178 	 * If multiple packets make up a frame, these will also be organized
       
   179 	 * by RTP timestamp and sequence number, and returned as a complete frame.
       
   180 	 * 
       
   181 	 * @param newNode the node to add to the packet buffer
       
   182 	 * @return 0 if everything is okay, -1 otherwise
       
   183 	 */
       
   184 	private int bufferedAddPkt(PktBufNode newNode) {
       
   185 		if(RTPSession.rtpDebugLevel > 8) {
       
   186 			System.out.println("<->    PktBuffer.bufferedAddPkt()");
       
   187 		}
       
   188 		if(length == 0) {
       
   189 			// The buffer was empty, this packet is the one and only.
       
   190 			newest = newNode;
       
   191 			oldest = newNode;
       
   192 		} else {
       
   193 			// The packetbuffer is not empty.
       
   194 			if(newNode.timeStamp > newest.timeStamp || newNode.seqNum > newest.seqNum) {
       
   195 				// Packet came in order
       
   196 				newNode.nextFrameQueueNode = newest;
       
   197 				newest.prevFrameQueueNode = newNode;
       
   198 				newest = newNode;
       
   199 			} else {
       
   200 				//There are packets, we need to order this one right.
       
   201 				if(! pktOnTime(newNode.timeStamp, newNode.seqNum) && rtpSession.pktBufBehavior > -1) {
       
   202 					// We got this too late, can't put it in order anymore.
       
   203 					if(RTPSession.rtpDebugLevel > 2) {
       
   204 						System.out.println("PktBuffer.addPkt Dropped a packet due to lag! " +  newNode.timeStamp + " " 
       
   205 								+ newNode.seqNum + " vs "+ oldest.timeStamp + " " + oldest.seqNum);
       
   206 					}
       
   207 					return -1;
       
   208 				}
       
   209 
       
   210 				//Need to do some real work, find out where it belongs (linear search from the back).
       
   211 				PktBufNode tmpNode = newest;
       
   212 				while(tmpNode.timeStamp > newNode.timeStamp) {
       
   213 					tmpNode = tmpNode.nextFrameQueueNode;
       
   214 				}
       
   215 				
       
   216 				if( tmpNode.timeStamp == newNode.timeStamp
       
   217 						&& rtpSession.frameReconstruction
       
   218 						&& newNode.seqNum != tmpNode.seqNum) {
       
   219 					//Packet has same timestamp, presumably belongs to frame. Need to order within frame.
       
   220 					if(RTPSession.rtpDebugLevel > 8) {
       
   221 						System.out.println("Found pkt with existing timeStamp: " + newNode.timeStamp);
       
   222 					}
       
   223 					int ret = addToFrame(tmpNode, newNode);
       
   224 					if(ret != 0) {
       
   225 						return ret;
       
   226 					}
       
   227 				} else {
       
   228 								
       
   229 					// Check that it's not a duplicate
       
   230 					if(tmpNode.timeStamp == newNode.timeStamp && newNode.seqNum == tmpNode.seqNum) {
       
   231 						if(RTPSession.rtpDebugLevel > 2) {
       
   232 							System.out.println("PktBuffer.addPkt Dropped a duplicate packet! " 
       
   233 									+  newNode.timeStamp + " " + newNode.seqNum );
       
   234 						}
       
   235 						return -1;
       
   236 					}
       
   237 				
       
   238 					// Insert into buffer
       
   239 					newNode.nextFrameQueueNode = tmpNode;
       
   240 					newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;
       
   241 
       
   242 					// Update the node behind
       
   243 					if(newNode.prevFrameQueueNode != null) {
       
   244 						newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
       
   245 					}
       
   246 					tmpNode.prevFrameQueueNode = newNode;
       
   247 
       
   248 					if(newNode.timeStamp > newest.timeStamp) {
       
   249 						newest = newNode; 
       
   250 					}
       
   251 				}
       
   252 			}
       
   253 		}
       
   254 		// Update the length of this buffer
       
   255 		length++;
       
   256 		return 0;
       
   257 	}
       
   258 	
       
   259 	/**
       
   260 	 * 
       
   261 	 * @param frameNode the node currently representing the frame in the packet buffer
       
   262 	 * @param newNode the new node to be added to the frame
       
   263 	 * @return 0 if no error, -2 if this is a duplicate packet
       
   264 	 */
       
   265 	private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
       
   266 		// Node has same timeStamp, assume pkt belongs to frame
       
   267 		
       
   268 		if(frameNode.seqNum < newNode.seqNum) {
       
   269 			// this is not the first packet in the frame
       
   270 			frameNode.pktCount++;
       
   271 			
       
   272 			// Find the right spot
       
   273 			while( frameNode.nextFrameNode != null 
       
   274 					&& frameNode.nextFrameNode.seqNum < newNode.seqNum) {
       
   275 				frameNode = frameNode.nextFrameNode;
       
   276 			}
       
   277 			
       
   278 			// Check whether packet is duplicate
       
   279 			if(frameNode.nextFrameNode != null 
       
   280 					&& frameNode.nextFrameNode.seqNum == newNode.seqNum) {
       
   281 				if(RTPSession.rtpDebugLevel > 2) {
       
   282 					System.out.println("PktBuffer.addPkt Dropped a duplicate packet!");
       
   283 				}
       
   284 				return -2;
       
   285 			}
       
   286 			
       
   287 			newNode.nextFrameNode = frameNode.nextFrameNode;
       
   288 			frameNode.nextFrameNode = newNode;
       
   289 		
       
   290 		} else {
       
   291 			// newNode has the lowest sequence number
       
   292 			newNode.nextFrameNode = frameNode;
       
   293 			newNode.pktCount = frameNode.pktCount + 1;
       
   294 			
       
   295 			//Update the queue
       
   296 			if(frameNode.nextFrameQueueNode != null) {
       
   297 				frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
       
   298 				newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
       
   299 				frameNode.nextFrameQueueNode = null;
       
   300 			}
       
   301 			if(frameNode.prevFrameQueueNode != null) {
       
   302 				frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
       
   303 				newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
       
   304 				frameNode.prevFrameQueueNode = null;
       
   305 			}
       
   306 			if(newest.timeStamp == newNode.timeStamp) {
       
   307 				newest = newNode;
       
   308 			}
       
   309 		}
       
   310 
       
   311 		return 0;
       
   312 	}
       
   313 
       
   314 	/** 
       
   315 	 * Checks the oldest frame, if there is one, sees whether it is complete.
       
   316 	 * @return Returns null if there are no complete frames available.
       
   317 	 */
       
   318 	protected synchronized DataFrame popOldestFrame() {
       
   319 		if(RTPSession.rtpDebugLevel > 7) {
       
   320 			System.out.println("-> PktBuffer.popOldestFrame()");
       
   321 		}
       
   322 		if(RTPSession.rtpDebugLevel > 10) {
       
   323 			this.debugPrint();
       
   324 		}
       
   325 		
       
   326 		if(this.rtpSession.pktBufBehavior > 0) {
       
   327 			return this.bufferedPopFrame();
       
   328 		} else {
       
   329 			return this.unbufferedPopFrame();
       
   330 		}
       
   331 	}
       
   332 	
       
   333 	/**
       
   334 	 * Will return the oldest frame without checking whether it is in
       
   335 	 * the right order, or whether we should wate for late arrivals.
       
   336 	 * 
       
   337 	 * @return the first frame on the queue, null otherwise
       
   338 	 */
       
   339 	private DataFrame unbufferedPopFrame() {
       
   340 		if(oldest != null) {
       
   341 			PktBufNode retNode = oldest;
       
   342 			
       
   343 			popFrameQueueCleanup(retNode, retNode.seqNum);
       
   344 			
       
   345 			return new DataFrame(retNode, this.p, 
       
   346 					rtpSession.appIntf.frameSize(retNode.pkt.getPayloadType()));
       
   347 		} else {
       
   348 			return null;
       
   349 		}
       
   350 	}
       
   351 	
       
   352 	/**
       
   353 	 * Only returns if the buffer is full, i.e. length exceeds
       
   354 	 * rtpSession.pktBufBehavior, or if the next packet directly
       
   355 	 * follows the previous one returned to the application.
       
   356 	 * 
       
   357 	 * @return first frame in order, null otherwise
       
   358 	 */
       
   359 	private DataFrame bufferedPopFrame() {
       
   360 		PktBufNode retNode = oldest;
       
   361 		/**
       
   362 		 * Three scenarios:
       
   363 		 * 1) There are no packets available
       
   364 		 * 2) The first packet is vailable and in order
       
   365 		 * 3) The first packet is not the next on in the sequence
       
   366 		 * 		a) We have exceeded the wait buffer
       
   367 		 * 		b) We wait
       
   368 		 */
       
   369 		//System.out.println(" Debug:" +(retNode != null) + " " + (retNode.seqNum == this.lastSeqNumber + 1)
       
   370 		//		+ " " + ( retNode.seqNum == 0 ) + " " +  (this.length > this.rtpSession.maxReorderBuffer)
       
   371 		//		+ " " + (this.lastSeqNumber < 0));
       
   372 
       
   373 		// Pop it off, null all references.
       
   374 		if( retNode != null && (retNode.seqNum == this.lastSeqNumber + 1 || retNode.seqNum == 0 
       
   375 					|| this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {
       
   376 			
       
   377 				
       
   378 			//if(tmpNode.pktCount == compLen) {
       
   379 			if(RTPSession.rtpDebugLevel > 7) {
       
   380 				System.out.println("<- PktBuffer.popOldestFrame() returns frame");
       
   381 			}
       
   382 			
       
   383 			DataFrame df = new DataFrame(retNode, this.p, 
       
   384 					rtpSession.appIntf.frameSize(oldest.pkt.getPayloadType()));
       
   385 			
       
   386 			//DataFrame df = new DataFrame(retNode, this.p, 1);
       
   387 			popFrameQueueCleanup(retNode, df.lastSeqNum);
       
   388 			
       
   389 			return df;
       
   390 		
       
   391 		} else {
       
   392 			// If we get here we have little to show for.
       
   393 			if(RTPSession.rtpDebugLevel > 2) {
       
   394 				System.out.println("<- PktBuffer.popOldestFrame() returns null " + retNode.seqNum + " " + this.lastSeqNumber);
       
   395 				this.debugPrint();
       
   396 			}
       
   397 			return null;
       
   398 		}
       
   399 	}
       
   400 	
       
   401 	/**
       
   402 	 * Cleans the packet buffer before returning the frame,
       
   403 	 * i.e. making sure the queue has a head etc.
       
   404 	 * 
       
   405 	 * @param retNode the node that is about to be popped
       
   406 	 * @param highestSeq the highest sequence number returned to the application
       
   407 	 */
       
   408 	private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
       
   409 		if(1 == length) {
       
   410 			//There's only one frame
       
   411 			newest = null;
       
   412 			oldest = null;
       
   413 		} else {
       
   414 			//There are more frames
       
   415 			oldest = oldest.prevFrameQueueNode;
       
   416 			oldest.nextFrameQueueNode = null;
       
   417 		}
       
   418 
       
   419 		// Update counters
       
   420 		length--;
       
   421 		
       
   422 		//Find the highest sequence number associated with this timestamp
       
   423 		this.lastSeqNumber = highestSeq;
       
   424 		this.lastTimestamp = retNode.timeStamp;
       
   425 	}
       
   426 	
       
   427 	/** 
       
   428 	 * Returns the length of the packetbuffer.
       
   429 	 * @return number of frames (complete or not) in packetbuffer.
       
   430 	 */
       
   431 	protected int getLength() {
       
   432 		return length;
       
   433 	}
       
   434 	
       
   435 	/**
       
   436 	 * Checks whether a packet is not too late, i.e. the next packet has already been returned.
       
   437 	 * @param timeStamp the RTP timestamp of the packet under consideration 
       
   438 	 * @param seqNum the sequence number of the packet under consideration
       
   439 	 * @return true if newer packets have not been handed to the application
       
   440 	 */
       
   441 	protected boolean pktOnTime(long timeStamp, int seqNum) {
       
   442 		if(this.lastSeqNumber == -1) {
       
   443 			// First packet
       
   444 			return true;
       
   445 		} else {			
       
   446 			if(seqNum >= this.lastSeqNumber) {
       
   447 				if(this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp ) {
       
   448 					return false;
       
   449 				}
       
   450 			} else {
       
   451 				if(seqNum > 3 || timeStamp < this.lastTimestamp) {
       
   452 					return false;
       
   453 				}
       
   454 			}
       
   455 		}
       
   456 		return true;
       
   457 	}
       
   458 
       
   459 	/** 
       
   460 	 * Prints out the packet buffer, oldest node first (on top).
       
   461 	 */
       
   462 	protected void debugPrint() {
       
   463 		System.out.println("PktBuffer.debugPrint() : length "+length+" SSRC "+SSRC+" lastSeqNum:"+lastSeqNumber);
       
   464 		PktBufNode tmpNode = oldest;
       
   465 		int i = 0;
       
   466 		while(tmpNode != null) {
       
   467 			//String str = tmpNode.timeStamp.toString();
       
   468 			System.out.println("   " + i + " seqNum:"+tmpNode.seqNum+" timeStamp: " + tmpNode.timeStamp + " pktCount:" + tmpNode.pktCount );
       
   469 			i++;
       
   470 			tmpNode = tmpNode.prevFrameQueueNode;
       
   471 		}
       
   472 	}
       
   473 }