src/jlibrtp/PktBuffer.java
changeset 823 2036ebfaccda
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/PktBuffer.java	Fri Nov 20 19:29:42 2009 +0100
@@ -0,0 +1,533 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+package jlibrtp;
+
+import org.sipdroid.net.tools.DataFramePool;
+import org.sipdroid.net.tools.PktBufNodePool;
+
+/**
+ * A PktBuffer stores packets either for buffering purposes, or because they
+ * need to be assimilated to create a complete frame.
+ * 
+ * This behavior can be controlled through rtpSession.pktBufBehavior()
+ * 
+ * It optionally drops duplicate packets.
+ * 
+ * Note that newest is the most recently received, i.e. highest timeStamp Next
+ * means new to old (from recently received to previously received)
+ * 
+ * @author Arne Kepp
+ */
+public class PktBuffer {
+	/**
+	 * The RTPSession holds information common to all packetBuffers, such as max
+	 * size
+	 */
+	RTPSession rtpSession;
+	/** SSRC of the the participant that this buffer is for */
+	long SSRC;
+	/** The parent participant */
+	Participant p;
+	/** The length of the buffer */
+	int length = 0;
+	/** The oldest, least recently received, packet */
+	PktBufNode oldest = null;
+	/** The newest, most recently received, packet */
+	PktBufNode newest = null;
+
+	/** The last sequence number received */
+	int lastSeqNumber = -1;
+	/** The last timestamp */
+	long lastTimestamp = -1;
+
+	/**
+	 * Creates a new PktBuffer, a linked list of PktBufNode
+	 * 
+	 * @param rtpSession
+	 *            the parent RTPSession
+	 * @param p
+	 *            the participant to which this packetbuffer belongs.
+	 * @param aPkt
+	 *            The first RTP packet, to be added to the buffer
+	 */
+	protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
+		this.rtpSession = rtpSession;
+		this.p = p;
+		SSRC = aPkt.getSsrc();
+		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
+		newNode.initPktBufNode(aPkt);
+		oldest = newNode;
+		newest = newNode;
+		// lastSeqNumber = (aPkt.getSeqNumber() - 1);
+		// lastTimestamp = aPkt.getTimeStamp();
+		length = 1;
+	}
+
+	/**
+	 * Adds a packet, this happens in constant time if they arrive in order.
+	 * Optimized for the case where each pkt is a complete frame.
+	 * 
+	 * @param aPkt
+	 *            the packet to be added to the buffer.
+	 * @return integer, negative if operation failed (see code)
+	 */
+	protected synchronized int addPkt(RtpPkt aPkt) {
+		if (aPkt == null) {
+			System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
+			return -5;
+		}
+
+		long timeStamp = aPkt.getTimeStamp();
+		if (RTPSession.rtpDebugLevel > 7) {
+			System.out.println("-> PktBuffer.addPkt() , length:" + length
+					+ " , timeStamp of Pkt: " + Long.toString(timeStamp));
+		}
+
+		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
+		newNode.initPktBufNode(aPkt);
+		if (aPkt.getSsrc() != SSRC) {
+			System.out.println("PktBuffer.addPkt() SSRCs don't match!");
+		}
+
+		int retVal = 0;
+		if (this.rtpSession.pktBufBehavior > 0) {
+			retVal = bufferedAddPkt(newNode);
+		} else if (this.rtpSession.pktBufBehavior == 0) {
+			retVal = filteredAddPkt(newNode);
+		} else if (this.rtpSession.pktBufBehavior == -1) {
+			retVal = unfilteredAddPkt(newNode);
+		}
+
+		if (RTPSession.rtpDebugLevel > 7) {
+			if (RTPSession.rtpDebugLevel > 10) {
+				this.debugPrint();
+			}
+			System.out.println("<- PktBuffer.addPkt() , length:" + length
+					+ " returning " + retVal);
+		}
+		return retVal;
+	}
+
+	/**
+	 * Adds packets in the same order that they arrive, doesn't do any filering
+	 * or processing.
+	 * 
+	 * @param newNode
+	 *            the node to add to the packet buffer
+	 * @return 0 if everything is okay, -1 otherwise
+	 */
+	private int unfilteredAddPkt(PktBufNode newNode) {
+		if (RTPSession.rtpDebugLevel > 8) {
+			System.out.println("<->    PktBuffer.unfilteredAddPkt()");
+		}
+		// No magic, just add to the end
+		if (oldest != null) {
+			oldest.nextFrameQueueNode = newNode;
+			newNode.prevFrameQueueNode = oldest;
+			oldest = newNode;
+		} else {
+			oldest = newNode;
+			newest = newNode;
+		}
+		return 0;
+	}
+
+	/**
+	 * Takes care of duplicate packets
+	 * 
+	 * @param newNode
+	 *            the node to add to the packet buffer
+	 * @return 0 if everything is okay, -1 otherwise
+	 */
+	private int filteredAddPkt(PktBufNode newNode) {
+		if (RTPSession.rtpDebugLevel > 8) {
+			System.out.println("<->    PktBuffer.filteredAddPkt()");
+		}
+
+		if (length == 0) {
+			// The buffer was empty, this packet is the one and only.
+			newest = newNode;
+			oldest = newNode;
+			length = 1;
+		} else {
+			// The packetbuffer is not empty.
+			if (newNode.timeStamp > newest.timeStamp
+					|| newNode.seqNum > newest.seqNum
+					&& (newNode.seqNum - newest.seqNum) < 10) {
+				// Packet came in order
+				newNode.nextFrameQueueNode = newest;
+				newest.prevFrameQueueNode = newNode;
+				newest = newNode;
+				length++;
+			} else {
+				if (RTPSession.rtpDebugLevel > 2) {
+					System.out
+							.println("PktBuffer.filteredAddPkt Dropped a packet due to lag! "
+									+ newNode.timeStamp
+									+ " "
+									+ newNode.seqNum
+									+ " vs "
+									+ oldest.timeStamp
+									+ " "
+									+ oldest.seqNum);
+				}
+				return -1;
+			}
+		}
+
+		return 0;
+	}
+
+	/**
+	 * Does most of the packet organization for the application. Packets are put
+	 * in order, duplicate packets or late arrivals are discarded
+	 * 
+	 * If multiple packets make up a frame, these will also be organized by RTP
+	 * timestamp and sequence number, and returned as a complete frame.
+	 * 
+	 * @param newNode
+	 *            the node to add to the packet buffer
+	 * @return 0 if everything is okay, -1 otherwise
+	 */
+	private int bufferedAddPkt(PktBufNode newNode) {
+		if (RTPSession.rtpDebugLevel > 8) {
+			System.out.println("<->    PktBuffer.bufferedAddPkt()");
+		}
+		if (length == 0) {
+			// The buffer was empty, this packet is the one and only.
+			newest = newNode;
+			oldest = newNode;
+		} else {
+			// The packetbuffer is not empty.
+			if (newNode.timeStamp > newest.timeStamp
+					|| newNode.seqNum > newest.seqNum) {
+				// Packet came in order
+				newNode.nextFrameQueueNode = newest;
+				newest.prevFrameQueueNode = newNode;
+				newest = newNode;
+			} else {
+				// There are packets, we need to order this one right.
+				if (!pktOnTime(newNode.timeStamp, newNode.seqNum)
+						&& rtpSession.pktBufBehavior > -1) {
+					// We got this too late, can't put it in order anymore.
+					if (RTPSession.rtpDebugLevel > 2) {
+						System.out
+								.println("PktBuffer.addPkt Dropped a packet due to lag! "
+										+ newNode.timeStamp
+										+ " "
+										+ newNode.seqNum
+										+ " vs "
+										+ oldest.timeStamp
+										+ " "
+										+ oldest.seqNum);
+					}
+					return -1;
+				}
+
+				// Need to do some real work, find out where it belongs (linear
+				// search from the back).
+				PktBufNode tmpNode = newest;
+				while (tmpNode.timeStamp > newNode.timeStamp) {
+					tmpNode = tmpNode.nextFrameQueueNode;
+				}
+
+				if (tmpNode.timeStamp == newNode.timeStamp
+						&& rtpSession.frameReconstruction
+						&& newNode.seqNum != tmpNode.seqNum) {
+					// Packet has same timestamp, presumably belongs to frame.
+					// Need to order within frame.
+					if (RTPSession.rtpDebugLevel > 8) {
+						System.out
+								.println("Found pkt with existing timeStamp: "
+										+ newNode.timeStamp);
+					}
+					int ret = addToFrame(tmpNode, newNode);
+					if (ret != 0) {
+						return ret;
+					}
+				} else {
+
+					// Check that it's not a duplicate
+					if (tmpNode.timeStamp == newNode.timeStamp
+							&& newNode.seqNum == tmpNode.seqNum) {
+						if (RTPSession.rtpDebugLevel > 2) {
+							System.out
+									.println("PktBuffer.addPkt Dropped a duplicate packet! "
+											+ newNode.timeStamp
+											+ " "
+											+ newNode.seqNum);
+						}
+						return -1;
+					}
+
+					// Insert into buffer
+					newNode.nextFrameQueueNode = tmpNode;
+					newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;
+
+					// Update the node behind
+					if (newNode.prevFrameQueueNode != null) {
+						newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+					}
+					tmpNode.prevFrameQueueNode = newNode;
+
+					if (newNode.timeStamp > newest.timeStamp) {
+						newest = newNode;
+					}
+				}
+			}
+		}
+		// Update the length of this buffer
+		length++;
+		return 0;
+	}
+
+	/**
+	 * 
+	 * @param frameNode
+	 *            the node currently representing the frame in the packet buffer
+	 * @param newNode
+	 *            the new node to be added to the frame
+	 * @return 0 if no error, -2 if this is a duplicate packet
+	 */
+	private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
+		// Node has same timeStamp, assume pkt belongs to frame
+
+		if (frameNode.seqNum < newNode.seqNum) {
+			// this is not the first packet in the frame
+			frameNode.pktCount++;
+
+			// Find the right spot
+			while (frameNode.nextFrameNode != null
+					&& frameNode.nextFrameNode.seqNum < newNode.seqNum) {
+				frameNode = frameNode.nextFrameNode;
+			}
+
+			// Check whether packet is duplicate
+			if (frameNode.nextFrameNode != null
+					&& frameNode.nextFrameNode.seqNum == newNode.seqNum) {
+				if (RTPSession.rtpDebugLevel > 2) {
+					System.out
+							.println("PktBuffer.addPkt Dropped a duplicate packet!");
+				}
+				return -2;
+			}
+
+			newNode.nextFrameNode = frameNode.nextFrameNode;
+			frameNode.nextFrameNode = newNode;
+
+		} else {
+			// newNode has the lowest sequence number
+			newNode.nextFrameNode = frameNode;
+			newNode.pktCount = frameNode.pktCount + 1;
+
+			// Update the queue
+			if (frameNode.nextFrameQueueNode != null) {
+				frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
+				newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
+				frameNode.nextFrameQueueNode = null;
+			}
+			if (frameNode.prevFrameQueueNode != null) {
+				frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+				newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
+				frameNode.prevFrameQueueNode = null;
+			}
+			if (newest.timeStamp == newNode.timeStamp) {
+				newest = newNode;
+			}
+		}
+
+		return 0;
+	}
+
+	/**
+	 * Checks the oldest frame, if there is one, sees whether it is complete.
+	 * 
+	 * @return Returns null if there are no complete frames available.
+	 */
+	protected synchronized DataFrame popOldestFrame() {
+		if (RTPSession.rtpDebugLevel > 7) {
+			System.out.println("-> PktBuffer.popOldestFrame()");
+		}
+		if (RTPSession.rtpDebugLevel > 10) {
+			this.debugPrint();
+		}
+
+		if (this.rtpSession.pktBufBehavior > 0) {
+			return this.bufferedPopFrame();
+		} else {
+			return this.unbufferedPopFrame();
+		}
+	}
+
+	/**
+	 * Will return the oldest frame without checking whether it is in the right
+	 * order, or whether we should wate for late arrivals.
+	 * 
+	 * @return the first frame on the queue, null otherwise
+	 */
+	private DataFrame unbufferedPopFrame() {
+		if (oldest != null) {
+			PktBufNode retNode = oldest;
+
+			popFrameQueueCleanup(retNode, retNode.seqNum);
+			DataFrame df = DataFramePool.getInstance().borrowFrame();
+			df.initDataFrame(retNode, this.p, rtpSession.appIntf
+					.frameSize(oldest.pkt.getPayloadType()));
+			return df;
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Only returns if the buffer is full, i.e. length exceeds
+	 * rtpSession.pktBufBehavior, or if the next packet directly follows the
+	 * previous one returned to the application.
+	 * 
+	 * @return first frame in order, null otherwise
+	 */
+	private DataFrame bufferedPopFrame() {
+		PktBufNode retNode = oldest;
+		/**
+		 * Three scenarios: 1) There are no packets available 2) The first
+		 * packet is vailable and in order 3) The first packet is not the next
+		 * on in the sequence a) We have exceeded the wait buffer b) We wait
+		 */
+		// System.out.println(" Debug:" +(retNode != null) + " " +
+		// (retNode.seqNum == this.lastSeqNumber + 1)
+		// + " " + ( retNode.seqNum == 0 ) + " " + (this.length >
+		// this.rtpSession.maxReorderBuffer)
+		// + " " + (this.lastSeqNumber < 0));
+
+		// Pop it off, null all references.
+		if (retNode != null
+				&& (retNode.seqNum == this.lastSeqNumber + 1
+						|| retNode.seqNum == 0
+						|| this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {
+
+			// if(tmpNode.pktCount == compLen) {
+			if (RTPSession.rtpDebugLevel > 7) {
+				System.out
+						.println("<- PktBuffer.popOldestFrame() returns frame");
+			}
+
+			DataFrame df = DataFramePool.getInstance().borrowFrame();
+			df.initDataFrame(retNode, this.p, rtpSession.appIntf
+					.frameSize(oldest.pkt.getPayloadType()));
+
+			// DataFrame df = new DataFrame(retNode, this.p, 1);
+			popFrameQueueCleanup(retNode, df.lastSeqNum);
+
+			return df;
+
+		} else {
+			// If we get here we have little to show for.
+			if (RTPSession.rtpDebugLevel > 2) {
+				System.out
+						.println("<- PktBuffer.popOldestFrame() returns null "
+								+ retNode.seqNum + " " + this.lastSeqNumber);
+				this.debugPrint();
+			}
+			return null;
+		}
+	}
+
+	/**
+	 * Cleans the packet buffer before returning the frame, i.e. making sure the
+	 * queue has a head etc.
+	 * 
+	 * @param retNode
+	 *            the node that is about to be popped
+	 * @param highestSeq
+	 *            the highest sequence number returned to the application
+	 */
+	private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
+		if (1 == length) {
+			// There's only one frame
+			newest = null;
+			oldest = null;
+		} else {
+			// There are more frames
+			oldest = oldest.prevFrameQueueNode;
+			oldest.nextFrameQueueNode = null;
+		}
+
+		// Update counters
+		length--;
+
+		// Find the highest sequence number associated with this timestamp
+		this.lastSeqNumber = highestSeq;
+		this.lastTimestamp = retNode.timeStamp;
+	}
+
+	/**
+	 * Returns the length of the packetbuffer.
+	 * 
+	 * @return number of frames (complete or not) in packetbuffer.
+	 */
+	protected int getLength() {
+		return length;
+	}
+
+	/**
+	 * Checks whether a packet is not too late, i.e. the next packet has already
+	 * been returned.
+	 * 
+	 * @param timeStamp
+	 *            the RTP timestamp of the packet under consideration
+	 * @param seqNum
+	 *            the sequence number of the packet under consideration
+	 * @return true if newer packets have not been handed to the application
+	 */
+	protected boolean pktOnTime(long timeStamp, int seqNum) {
+		if (this.lastSeqNumber == -1) {
+			// First packet
+			return true;
+		} else {
+			if (seqNum >= this.lastSeqNumber) {
+				if (this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp) {
+					return false;
+				}
+			} else {
+				if (seqNum > 3 || timeStamp < this.lastTimestamp) {
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Prints out the packet buffer, oldest node first (on top).
+	 */
+	protected void debugPrint() {
+		System.out.println("PktBuffer.debugPrint() : length " + length
+				+ " SSRC " + SSRC + " lastSeqNum:" + lastSeqNumber);
+		PktBufNode tmpNode = oldest;
+		int i = 0;
+		while (tmpNode != null) {
+			// String str = tmpNode.timeStamp.toString();
+			System.out.println("   " + i + " seqNum:" + tmpNode.seqNum
+					+ " timeStamp: " + tmpNode.timeStamp + " pktCount:"
+					+ tmpNode.pktCount);
+			i++;
+			tmpNode = tmpNode.prevFrameQueueNode;
+		}
+	}
+}