--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/PktBuffer.java Fri Nov 20 19:29:42 2009 +0100
@@ -0,0 +1,533 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+package jlibrtp;
+
+import org.sipdroid.net.tools.DataFramePool;
+import org.sipdroid.net.tools.PktBufNodePool;
+
+/**
+ * A PktBuffer stores packets either for buffering purposes, or because they
+ * need to be assimilated to create a complete frame.
+ *
+ * This behavior can be controlled through rtpSession.pktBufBehavior()
+ *
+ * It optionally drops duplicate packets.
+ *
+ * Note that newest is the most recently received, i.e. highest timeStamp Next
+ * means new to old (from recently received to previously received)
+ *
+ * @author Arne Kepp
+ */
+public class PktBuffer {
+ /**
+ * The RTPSession holds information common to all packetBuffers, such as max
+ * size
+ */
+ RTPSession rtpSession;
+ /** SSRC of the the participant that this buffer is for */
+ long SSRC;
+ /** The parent participant */
+ Participant p;
+ /** The length of the buffer */
+ int length = 0;
+ /** The oldest, least recently received, packet */
+ PktBufNode oldest = null;
+ /** The newest, most recently received, packet */
+ PktBufNode newest = null;
+
+ /** The last sequence number received */
+ int lastSeqNumber = -1;
+ /** The last timestamp */
+ long lastTimestamp = -1;
+
+ /**
+ * Creates a new PktBuffer, a linked list of PktBufNode
+ *
+ * @param rtpSession
+ * the parent RTPSession
+ * @param p
+ * the participant to which this packetbuffer belongs.
+ * @param aPkt
+ * The first RTP packet, to be added to the buffer
+ */
+ protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
+ this.rtpSession = rtpSession;
+ this.p = p;
+ SSRC = aPkt.getSsrc();
+ PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
+ newNode.initPktBufNode(aPkt);
+ oldest = newNode;
+ newest = newNode;
+ // lastSeqNumber = (aPkt.getSeqNumber() - 1);
+ // lastTimestamp = aPkt.getTimeStamp();
+ length = 1;
+ }
+
+ /**
+ * Adds a packet, this happens in constant time if they arrive in order.
+ * Optimized for the case where each pkt is a complete frame.
+ *
+ * @param aPkt
+ * the packet to be added to the buffer.
+ * @return integer, negative if operation failed (see code)
+ */
+ protected synchronized int addPkt(RtpPkt aPkt) {
+ if (aPkt == null) {
+ System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
+ return -5;
+ }
+
+ long timeStamp = aPkt.getTimeStamp();
+ if (RTPSession.rtpDebugLevel > 7) {
+ System.out.println("-> PktBuffer.addPkt() , length:" + length
+ + " , timeStamp of Pkt: " + Long.toString(timeStamp));
+ }
+
+ PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
+ newNode.initPktBufNode(aPkt);
+ if (aPkt.getSsrc() != SSRC) {
+ System.out.println("PktBuffer.addPkt() SSRCs don't match!");
+ }
+
+ int retVal = 0;
+ if (this.rtpSession.pktBufBehavior > 0) {
+ retVal = bufferedAddPkt(newNode);
+ } else if (this.rtpSession.pktBufBehavior == 0) {
+ retVal = filteredAddPkt(newNode);
+ } else if (this.rtpSession.pktBufBehavior == -1) {
+ retVal = unfilteredAddPkt(newNode);
+ }
+
+ if (RTPSession.rtpDebugLevel > 7) {
+ if (RTPSession.rtpDebugLevel > 10) {
+ this.debugPrint();
+ }
+ System.out.println("<- PktBuffer.addPkt() , length:" + length
+ + " returning " + retVal);
+ }
+ return retVal;
+ }
+
+ /**
+ * Adds packets in the same order that they arrive, doesn't do any filering
+ * or processing.
+ *
+ * @param newNode
+ * the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int unfilteredAddPkt(PktBufNode newNode) {
+ if (RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.unfilteredAddPkt()");
+ }
+ // No magic, just add to the end
+ if (oldest != null) {
+ oldest.nextFrameQueueNode = newNode;
+ newNode.prevFrameQueueNode = oldest;
+ oldest = newNode;
+ } else {
+ oldest = newNode;
+ newest = newNode;
+ }
+ return 0;
+ }
+
+ /**
+ * Takes care of duplicate packets
+ *
+ * @param newNode
+ * the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int filteredAddPkt(PktBufNode newNode) {
+ if (RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.filteredAddPkt()");
+ }
+
+ if (length == 0) {
+ // The buffer was empty, this packet is the one and only.
+ newest = newNode;
+ oldest = newNode;
+ length = 1;
+ } else {
+ // The packetbuffer is not empty.
+ if (newNode.timeStamp > newest.timeStamp
+ || newNode.seqNum > newest.seqNum
+ && (newNode.seqNum - newest.seqNum) < 10) {
+ // Packet came in order
+ newNode.nextFrameQueueNode = newest;
+ newest.prevFrameQueueNode = newNode;
+ newest = newNode;
+ length++;
+ } else {
+ if (RTPSession.rtpDebugLevel > 2) {
+ System.out
+ .println("PktBuffer.filteredAddPkt Dropped a packet due to lag! "
+ + newNode.timeStamp
+ + " "
+ + newNode.seqNum
+ + " vs "
+ + oldest.timeStamp
+ + " "
+ + oldest.seqNum);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Does most of the packet organization for the application. Packets are put
+ * in order, duplicate packets or late arrivals are discarded
+ *
+ * If multiple packets make up a frame, these will also be organized by RTP
+ * timestamp and sequence number, and returned as a complete frame.
+ *
+ * @param newNode
+ * the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int bufferedAddPkt(PktBufNode newNode) {
+ if (RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.bufferedAddPkt()");
+ }
+ if (length == 0) {
+ // The buffer was empty, this packet is the one and only.
+ newest = newNode;
+ oldest = newNode;
+ } else {
+ // The packetbuffer is not empty.
+ if (newNode.timeStamp > newest.timeStamp
+ || newNode.seqNum > newest.seqNum) {
+ // Packet came in order
+ newNode.nextFrameQueueNode = newest;
+ newest.prevFrameQueueNode = newNode;
+ newest = newNode;
+ } else {
+ // There are packets, we need to order this one right.
+ if (!pktOnTime(newNode.timeStamp, newNode.seqNum)
+ && rtpSession.pktBufBehavior > -1) {
+ // We got this too late, can't put it in order anymore.
+ if (RTPSession.rtpDebugLevel > 2) {
+ System.out
+ .println("PktBuffer.addPkt Dropped a packet due to lag! "
+ + newNode.timeStamp
+ + " "
+ + newNode.seqNum
+ + " vs "
+ + oldest.timeStamp
+ + " "
+ + oldest.seqNum);
+ }
+ return -1;
+ }
+
+ // Need to do some real work, find out where it belongs (linear
+ // search from the back).
+ PktBufNode tmpNode = newest;
+ while (tmpNode.timeStamp > newNode.timeStamp) {
+ tmpNode = tmpNode.nextFrameQueueNode;
+ }
+
+ if (tmpNode.timeStamp == newNode.timeStamp
+ && rtpSession.frameReconstruction
+ && newNode.seqNum != tmpNode.seqNum) {
+ // Packet has same timestamp, presumably belongs to frame.
+ // Need to order within frame.
+ if (RTPSession.rtpDebugLevel > 8) {
+ System.out
+ .println("Found pkt with existing timeStamp: "
+ + newNode.timeStamp);
+ }
+ int ret = addToFrame(tmpNode, newNode);
+ if (ret != 0) {
+ return ret;
+ }
+ } else {
+
+ // Check that it's not a duplicate
+ if (tmpNode.timeStamp == newNode.timeStamp
+ && newNode.seqNum == tmpNode.seqNum) {
+ if (RTPSession.rtpDebugLevel > 2) {
+ System.out
+ .println("PktBuffer.addPkt Dropped a duplicate packet! "
+ + newNode.timeStamp
+ + " "
+ + newNode.seqNum);
+ }
+ return -1;
+ }
+
+ // Insert into buffer
+ newNode.nextFrameQueueNode = tmpNode;
+ newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;
+
+ // Update the node behind
+ if (newNode.prevFrameQueueNode != null) {
+ newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+ }
+ tmpNode.prevFrameQueueNode = newNode;
+
+ if (newNode.timeStamp > newest.timeStamp) {
+ newest = newNode;
+ }
+ }
+ }
+ }
+ // Update the length of this buffer
+ length++;
+ return 0;
+ }
+
+ /**
+ *
+ * @param frameNode
+ * the node currently representing the frame in the packet buffer
+ * @param newNode
+ * the new node to be added to the frame
+ * @return 0 if no error, -2 if this is a duplicate packet
+ */
+ private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
+ // Node has same timeStamp, assume pkt belongs to frame
+
+ if (frameNode.seqNum < newNode.seqNum) {
+ // this is not the first packet in the frame
+ frameNode.pktCount++;
+
+ // Find the right spot
+ while (frameNode.nextFrameNode != null
+ && frameNode.nextFrameNode.seqNum < newNode.seqNum) {
+ frameNode = frameNode.nextFrameNode;
+ }
+
+ // Check whether packet is duplicate
+ if (frameNode.nextFrameNode != null
+ && frameNode.nextFrameNode.seqNum == newNode.seqNum) {
+ if (RTPSession.rtpDebugLevel > 2) {
+ System.out
+ .println("PktBuffer.addPkt Dropped a duplicate packet!");
+ }
+ return -2;
+ }
+
+ newNode.nextFrameNode = frameNode.nextFrameNode;
+ frameNode.nextFrameNode = newNode;
+
+ } else {
+ // newNode has the lowest sequence number
+ newNode.nextFrameNode = frameNode;
+ newNode.pktCount = frameNode.pktCount + 1;
+
+ // Update the queue
+ if (frameNode.nextFrameQueueNode != null) {
+ frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
+ newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
+ frameNode.nextFrameQueueNode = null;
+ }
+ if (frameNode.prevFrameQueueNode != null) {
+ frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+ newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
+ frameNode.prevFrameQueueNode = null;
+ }
+ if (newest.timeStamp == newNode.timeStamp) {
+ newest = newNode;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Checks the oldest frame, if there is one, sees whether it is complete.
+ *
+ * @return Returns null if there are no complete frames available.
+ */
+ protected synchronized DataFrame popOldestFrame() {
+ if (RTPSession.rtpDebugLevel > 7) {
+ System.out.println("-> PktBuffer.popOldestFrame()");
+ }
+ if (RTPSession.rtpDebugLevel > 10) {
+ this.debugPrint();
+ }
+
+ if (this.rtpSession.pktBufBehavior > 0) {
+ return this.bufferedPopFrame();
+ } else {
+ return this.unbufferedPopFrame();
+ }
+ }
+
+ /**
+ * Will return the oldest frame without checking whether it is in the right
+ * order, or whether we should wate for late arrivals.
+ *
+ * @return the first frame on the queue, null otherwise
+ */
+ private DataFrame unbufferedPopFrame() {
+ if (oldest != null) {
+ PktBufNode retNode = oldest;
+
+ popFrameQueueCleanup(retNode, retNode.seqNum);
+ DataFrame df = DataFramePool.getInstance().borrowFrame();
+ df.initDataFrame(retNode, this.p, rtpSession.appIntf
+ .frameSize(oldest.pkt.getPayloadType()));
+ return df;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Only returns if the buffer is full, i.e. length exceeds
+ * rtpSession.pktBufBehavior, or if the next packet directly follows the
+ * previous one returned to the application.
+ *
+ * @return first frame in order, null otherwise
+ */
+ private DataFrame bufferedPopFrame() {
+ PktBufNode retNode = oldest;
+ /**
+ * Three scenarios: 1) There are no packets available 2) The first
+ * packet is vailable and in order 3) The first packet is not the next
+ * on in the sequence a) We have exceeded the wait buffer b) We wait
+ */
+ // System.out.println(" Debug:" +(retNode != null) + " " +
+ // (retNode.seqNum == this.lastSeqNumber + 1)
+ // + " " + ( retNode.seqNum == 0 ) + " " + (this.length >
+ // this.rtpSession.maxReorderBuffer)
+ // + " " + (this.lastSeqNumber < 0));
+
+ // Pop it off, null all references.
+ if (retNode != null
+ && (retNode.seqNum == this.lastSeqNumber + 1
+ || retNode.seqNum == 0
+ || this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {
+
+ // if(tmpNode.pktCount == compLen) {
+ if (RTPSession.rtpDebugLevel > 7) {
+ System.out
+ .println("<- PktBuffer.popOldestFrame() returns frame");
+ }
+
+ DataFrame df = DataFramePool.getInstance().borrowFrame();
+ df.initDataFrame(retNode, this.p, rtpSession.appIntf
+ .frameSize(oldest.pkt.getPayloadType()));
+
+ // DataFrame df = new DataFrame(retNode, this.p, 1);
+ popFrameQueueCleanup(retNode, df.lastSeqNum);
+
+ return df;
+
+ } else {
+ // If we get here we have little to show for.
+ if (RTPSession.rtpDebugLevel > 2) {
+ System.out
+ .println("<- PktBuffer.popOldestFrame() returns null "
+ + retNode.seqNum + " " + this.lastSeqNumber);
+ this.debugPrint();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Cleans the packet buffer before returning the frame, i.e. making sure the
+ * queue has a head etc.
+ *
+ * @param retNode
+ * the node that is about to be popped
+ * @param highestSeq
+ * the highest sequence number returned to the application
+ */
+ private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
+ if (1 == length) {
+ // There's only one frame
+ newest = null;
+ oldest = null;
+ } else {
+ // There are more frames
+ oldest = oldest.prevFrameQueueNode;
+ oldest.nextFrameQueueNode = null;
+ }
+
+ // Update counters
+ length--;
+
+ // Find the highest sequence number associated with this timestamp
+ this.lastSeqNumber = highestSeq;
+ this.lastTimestamp = retNode.timeStamp;
+ }
+
+ /**
+ * Returns the length of the packetbuffer.
+ *
+ * @return number of frames (complete or not) in packetbuffer.
+ */
+ protected int getLength() {
+ return length;
+ }
+
+ /**
+ * Checks whether a packet is not too late, i.e. the next packet has already
+ * been returned.
+ *
+ * @param timeStamp
+ * the RTP timestamp of the packet under consideration
+ * @param seqNum
+ * the sequence number of the packet under consideration
+ * @return true if newer packets have not been handed to the application
+ */
+ protected boolean pktOnTime(long timeStamp, int seqNum) {
+ if (this.lastSeqNumber == -1) {
+ // First packet
+ return true;
+ } else {
+ if (seqNum >= this.lastSeqNumber) {
+ if (this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp) {
+ return false;
+ }
+ } else {
+ if (seqNum > 3 || timeStamp < this.lastTimestamp) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Prints out the packet buffer, oldest node first (on top).
+ */
+ protected void debugPrint() {
+ System.out.println("PktBuffer.debugPrint() : length " + length
+ + " SSRC " + SSRC + " lastSeqNum:" + lastSeqNumber);
+ PktBufNode tmpNode = oldest;
+ int i = 0;
+ while (tmpNode != null) {
+ // String str = tmpNode.timeStamp.toString();
+ System.out.println(" " + i + " seqNum:" + tmpNode.seqNum
+ + " timeStamp: " + tmpNode.timeStamp + " pktCount:"
+ + tmpNode.pktCount);
+ i++;
+ tmpNode = tmpNode.prevFrameQueueNode;
+ }
+ }
+}