--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/PktBuffer.java Sat Mar 14 22:15:41 2009 +0100
@@ -0,0 +1,473 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+package jlibrtp;
+
+/**
+ * A PktBuffer stores packets either for buffering purposes,
+ * or because they need to be assimilated to create a complete frame.
+ *
+ * This behavior can be controlled through rtpSession.pktBufBehavior()
+ *
+ * It optionally drops duplicate packets.
+ *
+ * Note that newest is the most recently received, i.e. highest timeStamp
+ * Next means new to old (from recently received to previously received)
+ *
+ * @author Arne Kepp
+ */
+public class PktBuffer {
+ /** The RTPSession holds information common to all packetBuffers, such as max size */
+ RTPSession rtpSession;
+ /** SSRC of the the participant that this buffer is for */
+ long SSRC;
+ /** The parent participant */
+ Participant p;
+ /** The length of the buffer */
+ int length = 0;
+ /** The oldest, least recently received, packet */
+ PktBufNode oldest = null;
+ /** The newest, most recently received, packet */
+ PktBufNode newest = null;
+
+ /** The last sequence number received */
+ int lastSeqNumber = -1;
+ /** The last timestamp */
+ long lastTimestamp = -1;
+
+ /**
+ * Creates a new PktBuffer, a linked list of PktBufNode
+ *
+ * @param rtpSession the parent RTPSession
+ * @param p the participant to which this packetbuffer belongs.
+ * @param aPkt The first RTP packet, to be added to the buffer
+ */
+ protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
+ this.rtpSession = rtpSession;
+ this.p = p;
+ SSRC = aPkt.getSsrc();
+ PktBufNode newNode = new PktBufNode(aPkt);
+ oldest = newNode;
+ newest = newNode;
+ //lastSeqNumber = (aPkt.getSeqNumber() - 1);
+ //lastTimestamp = aPkt.getTimeStamp();
+ length = 1;
+ }
+
+ /**
+ * Adds a packet, this happens in constant time if they arrive in order.
+ * Optimized for the case where each pkt is a complete frame.
+ *
+ * @param aPkt the packet to be added to the buffer.
+ * @return integer, negative if operation failed (see code)
+ */
+ protected synchronized int addPkt(RtpPkt aPkt) {
+ if(aPkt == null) {
+ System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
+ return -5;
+ }
+
+ long timeStamp = aPkt.getTimeStamp();
+ if(RTPSession.rtpDebugLevel > 7) {
+ System.out.println("-> PktBuffer.addPkt() , length:" + length + " , timeStamp of Pkt: " + Long.toString(timeStamp));
+ }
+
+
+ PktBufNode newNode = new PktBufNode(aPkt);
+ if(aPkt.getSsrc() != SSRC) {
+ System.out.println("PktBuffer.addPkt() SSRCs don't match!");
+ }
+
+ int retVal = 0;
+ if(this.rtpSession.pktBufBehavior > 0) {
+ retVal = bufferedAddPkt(newNode);
+ } else if(this.rtpSession.pktBufBehavior == 0) {
+ retVal = filteredAddPkt(newNode);
+ } else if(this.rtpSession.pktBufBehavior == -1) {
+ retVal = unfilteredAddPkt(newNode);
+ }
+
+
+ if(RTPSession.rtpDebugLevel > 7) {
+ if(RTPSession.rtpDebugLevel > 10) {
+ this.debugPrint();
+ }
+ System.out.println("<- PktBuffer.addPkt() , length:" + length + " returning " + retVal);
+ }
+ return retVal;
+ }
+
+ /**
+ * Adds packets in the same order that they arrive,
+ * doesn't do any filering or processing.
+ *
+ * @param newNode the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int unfilteredAddPkt(PktBufNode newNode) {
+ if(RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.unfilteredAddPkt()");
+ }
+ //No magic, just add to the end
+ if(oldest != null) {
+ oldest.nextFrameQueueNode = newNode;
+ newNode.prevFrameQueueNode = oldest;
+ oldest = newNode;
+ } else {
+ oldest = newNode;
+ newest = newNode;
+ }
+ return 0;
+ }
+
+ /**
+ * Takes care of duplicate packets
+ *
+ * @param newNode the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int filteredAddPkt(PktBufNode newNode) {
+ if(RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.filteredAddPkt()");
+ }
+
+ if(length == 0) {
+ // The buffer was empty, this packet is the one and only.
+ newest = newNode;
+ oldest = newNode;
+ length = 1;
+ } else {
+ // The packetbuffer is not empty.
+ if(newNode.timeStamp > newest.timeStamp || newNode.seqNum > newest.seqNum && (newNode.seqNum - newest.seqNum) < 10) {
+ // Packet came in order
+ newNode.nextFrameQueueNode = newest;
+ newest.prevFrameQueueNode = newNode;
+ newest = newNode;
+ length++;
+ } else {
+ if(RTPSession.rtpDebugLevel > 2) {
+ System.out.println("PktBuffer.filteredAddPkt Dropped a packet due to lag! " + newNode.timeStamp + " "
+ + newNode.seqNum + " vs "+ oldest.timeStamp + " " + oldest.seqNum);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Does most of the packet organization for the application.
+ * Packets are put in order, duplicate packets or late arrivals are discarded
+ *
+ * If multiple packets make up a frame, these will also be organized
+ * by RTP timestamp and sequence number, and returned as a complete frame.
+ *
+ * @param newNode the node to add to the packet buffer
+ * @return 0 if everything is okay, -1 otherwise
+ */
+ private int bufferedAddPkt(PktBufNode newNode) {
+ if(RTPSession.rtpDebugLevel > 8) {
+ System.out.println("<-> PktBuffer.bufferedAddPkt()");
+ }
+ if(length == 0) {
+ // The buffer was empty, this packet is the one and only.
+ newest = newNode;
+ oldest = newNode;
+ } else {
+ // The packetbuffer is not empty.
+ if(newNode.timeStamp > newest.timeStamp || newNode.seqNum > newest.seqNum) {
+ // Packet came in order
+ newNode.nextFrameQueueNode = newest;
+ newest.prevFrameQueueNode = newNode;
+ newest = newNode;
+ } else {
+ //There are packets, we need to order this one right.
+ if(! pktOnTime(newNode.timeStamp, newNode.seqNum) && rtpSession.pktBufBehavior > -1) {
+ // We got this too late, can't put it in order anymore.
+ if(RTPSession.rtpDebugLevel > 2) {
+ System.out.println("PktBuffer.addPkt Dropped a packet due to lag! " + newNode.timeStamp + " "
+ + newNode.seqNum + " vs "+ oldest.timeStamp + " " + oldest.seqNum);
+ }
+ return -1;
+ }
+
+ //Need to do some real work, find out where it belongs (linear search from the back).
+ PktBufNode tmpNode = newest;
+ while(tmpNode.timeStamp > newNode.timeStamp) {
+ tmpNode = tmpNode.nextFrameQueueNode;
+ }
+
+ if( tmpNode.timeStamp == newNode.timeStamp
+ && rtpSession.frameReconstruction
+ && newNode.seqNum != tmpNode.seqNum) {
+ //Packet has same timestamp, presumably belongs to frame. Need to order within frame.
+ if(RTPSession.rtpDebugLevel > 8) {
+ System.out.println("Found pkt with existing timeStamp: " + newNode.timeStamp);
+ }
+ int ret = addToFrame(tmpNode, newNode);
+ if(ret != 0) {
+ return ret;
+ }
+ } else {
+
+ // Check that it's not a duplicate
+ if(tmpNode.timeStamp == newNode.timeStamp && newNode.seqNum == tmpNode.seqNum) {
+ if(RTPSession.rtpDebugLevel > 2) {
+ System.out.println("PktBuffer.addPkt Dropped a duplicate packet! "
+ + newNode.timeStamp + " " + newNode.seqNum );
+ }
+ return -1;
+ }
+
+ // Insert into buffer
+ newNode.nextFrameQueueNode = tmpNode;
+ newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;
+
+ // Update the node behind
+ if(newNode.prevFrameQueueNode != null) {
+ newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+ }
+ tmpNode.prevFrameQueueNode = newNode;
+
+ if(newNode.timeStamp > newest.timeStamp) {
+ newest = newNode;
+ }
+ }
+ }
+ }
+ // Update the length of this buffer
+ length++;
+ return 0;
+ }
+
+ /**
+ *
+ * @param frameNode the node currently representing the frame in the packet buffer
+ * @param newNode the new node to be added to the frame
+ * @return 0 if no error, -2 if this is a duplicate packet
+ */
+ private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
+ // Node has same timeStamp, assume pkt belongs to frame
+
+ if(frameNode.seqNum < newNode.seqNum) {
+ // this is not the first packet in the frame
+ frameNode.pktCount++;
+
+ // Find the right spot
+ while( frameNode.nextFrameNode != null
+ && frameNode.nextFrameNode.seqNum < newNode.seqNum) {
+ frameNode = frameNode.nextFrameNode;
+ }
+
+ // Check whether packet is duplicate
+ if(frameNode.nextFrameNode != null
+ && frameNode.nextFrameNode.seqNum == newNode.seqNum) {
+ if(RTPSession.rtpDebugLevel > 2) {
+ System.out.println("PktBuffer.addPkt Dropped a duplicate packet!");
+ }
+ return -2;
+ }
+
+ newNode.nextFrameNode = frameNode.nextFrameNode;
+ frameNode.nextFrameNode = newNode;
+
+ } else {
+ // newNode has the lowest sequence number
+ newNode.nextFrameNode = frameNode;
+ newNode.pktCount = frameNode.pktCount + 1;
+
+ //Update the queue
+ if(frameNode.nextFrameQueueNode != null) {
+ frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
+ newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
+ frameNode.nextFrameQueueNode = null;
+ }
+ if(frameNode.prevFrameQueueNode != null) {
+ frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
+ newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
+ frameNode.prevFrameQueueNode = null;
+ }
+ if(newest.timeStamp == newNode.timeStamp) {
+ newest = newNode;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Checks the oldest frame, if there is one, sees whether it is complete.
+ * @return Returns null if there are no complete frames available.
+ */
+ protected synchronized DataFrame popOldestFrame() {
+ if(RTPSession.rtpDebugLevel > 7) {
+ System.out.println("-> PktBuffer.popOldestFrame()");
+ }
+ if(RTPSession.rtpDebugLevel > 10) {
+ this.debugPrint();
+ }
+
+ if(this.rtpSession.pktBufBehavior > 0) {
+ return this.bufferedPopFrame();
+ } else {
+ return this.unbufferedPopFrame();
+ }
+ }
+
+ /**
+ * Will return the oldest frame without checking whether it is in
+ * the right order, or whether we should wate for late arrivals.
+ *
+ * @return the first frame on the queue, null otherwise
+ */
+ private DataFrame unbufferedPopFrame() {
+ if(oldest != null) {
+ PktBufNode retNode = oldest;
+
+ popFrameQueueCleanup(retNode, retNode.seqNum);
+
+ return new DataFrame(retNode, this.p,
+ rtpSession.appIntf.frameSize(retNode.pkt.getPayloadType()));
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Only returns if the buffer is full, i.e. length exceeds
+ * rtpSession.pktBufBehavior, or if the next packet directly
+ * follows the previous one returned to the application.
+ *
+ * @return first frame in order, null otherwise
+ */
+ private DataFrame bufferedPopFrame() {
+ PktBufNode retNode = oldest;
+ /**
+ * Three scenarios:
+ * 1) There are no packets available
+ * 2) The first packet is vailable and in order
+ * 3) The first packet is not the next on in the sequence
+ * a) We have exceeded the wait buffer
+ * b) We wait
+ */
+ //System.out.println(" Debug:" +(retNode != null) + " " + (retNode.seqNum == this.lastSeqNumber + 1)
+ // + " " + ( retNode.seqNum == 0 ) + " " + (this.length > this.rtpSession.maxReorderBuffer)
+ // + " " + (this.lastSeqNumber < 0));
+
+ // Pop it off, null all references.
+ if( retNode != null && (retNode.seqNum == this.lastSeqNumber + 1 || retNode.seqNum == 0
+ || this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {
+
+
+ //if(tmpNode.pktCount == compLen) {
+ if(RTPSession.rtpDebugLevel > 7) {
+ System.out.println("<- PktBuffer.popOldestFrame() returns frame");
+ }
+
+ DataFrame df = new DataFrame(retNode, this.p,
+ rtpSession.appIntf.frameSize(oldest.pkt.getPayloadType()));
+
+ //DataFrame df = new DataFrame(retNode, this.p, 1);
+ popFrameQueueCleanup(retNode, df.lastSeqNum);
+
+ return df;
+
+ } else {
+ // If we get here we have little to show for.
+ if(RTPSession.rtpDebugLevel > 2) {
+ System.out.println("<- PktBuffer.popOldestFrame() returns null " + retNode.seqNum + " " + this.lastSeqNumber);
+ this.debugPrint();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Cleans the packet buffer before returning the frame,
+ * i.e. making sure the queue has a head etc.
+ *
+ * @param retNode the node that is about to be popped
+ * @param highestSeq the highest sequence number returned to the application
+ */
+ private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
+ if(1 == length) {
+ //There's only one frame
+ newest = null;
+ oldest = null;
+ } else {
+ //There are more frames
+ oldest = oldest.prevFrameQueueNode;
+ oldest.nextFrameQueueNode = null;
+ }
+
+ // Update counters
+ length--;
+
+ //Find the highest sequence number associated with this timestamp
+ this.lastSeqNumber = highestSeq;
+ this.lastTimestamp = retNode.timeStamp;
+ }
+
+ /**
+ * Returns the length of the packetbuffer.
+ * @return number of frames (complete or not) in packetbuffer.
+ */
+ protected int getLength() {
+ return length;
+ }
+
+ /**
+ * Checks whether a packet is not too late, i.e. the next packet has already been returned.
+ * @param timeStamp the RTP timestamp of the packet under consideration
+ * @param seqNum the sequence number of the packet under consideration
+ * @return true if newer packets have not been handed to the application
+ */
+ protected boolean pktOnTime(long timeStamp, int seqNum) {
+ if(this.lastSeqNumber == -1) {
+ // First packet
+ return true;
+ } else {
+ if(seqNum >= this.lastSeqNumber) {
+ if(this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp ) {
+ return false;
+ }
+ } else {
+ if(seqNum > 3 || timeStamp < this.lastTimestamp) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Prints out the packet buffer, oldest node first (on top).
+ */
+ protected void debugPrint() {
+ System.out.println("PktBuffer.debugPrint() : length "+length+" SSRC "+SSRC+" lastSeqNum:"+lastSeqNumber);
+ PktBufNode tmpNode = oldest;
+ int i = 0;
+ while(tmpNode != null) {
+ //String str = tmpNode.timeStamp.toString();
+ System.out.println(" " + i + " seqNum:"+tmpNode.seqNum+" timeStamp: " + tmpNode.timeStamp + " pktCount:" + tmpNode.pktCount );
+ i++;
+ tmpNode = tmpNode.prevFrameQueueNode;
+ }
+ }
+}