src/jlibrtp/PktBuffer.java
author nikita@jibe-desktop
Fri, 20 Nov 2009 19:29:42 +0100
changeset 823 2036ebfaccda
permissions -rw-r--r--
debut de la gestion de l'audio, faut tester avec des pcs distincts

/**
 * Java RTP Library (jlibrtp)
 * Copyright (C) 2006 Arne Kepp
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package jlibrtp;

import org.sipdroid.net.tools.DataFramePool;
import org.sipdroid.net.tools.PktBufNodePool;

/**
 * A PktBuffer stores packets either for buffering purposes, or because they
 * need to be assimilated to create a complete frame.
 * 
 * This behavior can be controlled through rtpSession.pktBufBehavior()
 * 
 * It optionally drops duplicate packets.
 * 
 * Note that newest is the most recently received, i.e. highest timeStamp Next
 * means new to old (from recently received to previously received)
 * 
 * @author Arne Kepp
 */
public class PktBuffer {
	/**
	 * The RTPSession holds information common to all packetBuffers, such as max
	 * size
	 */
	RTPSession rtpSession;
	/** SSRC of the the participant that this buffer is for */
	long SSRC;
	/** The parent participant */
	Participant p;
	/** The length of the buffer */
	int length = 0;
	/** The oldest, least recently received, packet */
	PktBufNode oldest = null;
	/** The newest, most recently received, packet */
	PktBufNode newest = null;

	/** The last sequence number received */
	int lastSeqNumber = -1;
	/** The last timestamp */
	long lastTimestamp = -1;

	/**
	 * Creates a new PktBuffer, a linked list of PktBufNode
	 * 
	 * @param rtpSession
	 *            the parent RTPSession
	 * @param p
	 *            the participant to which this packetbuffer belongs.
	 * @param aPkt
	 *            The first RTP packet, to be added to the buffer
	 */
	protected PktBuffer(RTPSession rtpSession, Participant p, RtpPkt aPkt) {
		this.rtpSession = rtpSession;
		this.p = p;
		SSRC = aPkt.getSsrc();
		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
		newNode.initPktBufNode(aPkt);
		oldest = newNode;
		newest = newNode;
		// lastSeqNumber = (aPkt.getSeqNumber() - 1);
		// lastTimestamp = aPkt.getTimeStamp();
		length = 1;
	}

	/**
	 * Adds a packet, this happens in constant time if they arrive in order.
	 * Optimized for the case where each pkt is a complete frame.
	 * 
	 * @param aPkt
	 *            the packet to be added to the buffer.
	 * @return integer, negative if operation failed (see code)
	 */
	protected synchronized int addPkt(RtpPkt aPkt) {
		if (aPkt == null) {
			System.out.println("! PktBuffer.addPkt(aPkt) aPkt was null");
			return -5;
		}

		long timeStamp = aPkt.getTimeStamp();
		if (RTPSession.rtpDebugLevel > 7) {
			System.out.println("-> PktBuffer.addPkt() , length:" + length
					+ " , timeStamp of Pkt: " + Long.toString(timeStamp));
		}

		PktBufNode newNode = PktBufNodePool.getInstance().borrowBufNode();
		newNode.initPktBufNode(aPkt);
		if (aPkt.getSsrc() != SSRC) {
			System.out.println("PktBuffer.addPkt() SSRCs don't match!");
		}

		int retVal = 0;
		if (this.rtpSession.pktBufBehavior > 0) {
			retVal = bufferedAddPkt(newNode);
		} else if (this.rtpSession.pktBufBehavior == 0) {
			retVal = filteredAddPkt(newNode);
		} else if (this.rtpSession.pktBufBehavior == -1) {
			retVal = unfilteredAddPkt(newNode);
		}

		if (RTPSession.rtpDebugLevel > 7) {
			if (RTPSession.rtpDebugLevel > 10) {
				this.debugPrint();
			}
			System.out.println("<- PktBuffer.addPkt() , length:" + length
					+ " returning " + retVal);
		}
		return retVal;
	}

	/**
	 * Adds packets in the same order that they arrive, doesn't do any filering
	 * or processing.
	 * 
	 * @param newNode
	 *            the node to add to the packet buffer
	 * @return 0 if everything is okay, -1 otherwise
	 */
	private int unfilteredAddPkt(PktBufNode newNode) {
		if (RTPSession.rtpDebugLevel > 8) {
			System.out.println("<->    PktBuffer.unfilteredAddPkt()");
		}
		// No magic, just add to the end
		if (oldest != null) {
			oldest.nextFrameQueueNode = newNode;
			newNode.prevFrameQueueNode = oldest;
			oldest = newNode;
		} else {
			oldest = newNode;
			newest = newNode;
		}
		return 0;
	}

	/**
	 * Takes care of duplicate packets
	 * 
	 * @param newNode
	 *            the node to add to the packet buffer
	 * @return 0 if everything is okay, -1 otherwise
	 */
	private int filteredAddPkt(PktBufNode newNode) {
		if (RTPSession.rtpDebugLevel > 8) {
			System.out.println("<->    PktBuffer.filteredAddPkt()");
		}

		if (length == 0) {
			// The buffer was empty, this packet is the one and only.
			newest = newNode;
			oldest = newNode;
			length = 1;
		} else {
			// The packetbuffer is not empty.
			if (newNode.timeStamp > newest.timeStamp
					|| newNode.seqNum > newest.seqNum
					&& (newNode.seqNum - newest.seqNum) < 10) {
				// Packet came in order
				newNode.nextFrameQueueNode = newest;
				newest.prevFrameQueueNode = newNode;
				newest = newNode;
				length++;
			} else {
				if (RTPSession.rtpDebugLevel > 2) {
					System.out
							.println("PktBuffer.filteredAddPkt Dropped a packet due to lag! "
									+ newNode.timeStamp
									+ " "
									+ newNode.seqNum
									+ " vs "
									+ oldest.timeStamp
									+ " "
									+ oldest.seqNum);
				}
				return -1;
			}
		}

		return 0;
	}

	/**
	 * Does most of the packet organization for the application. Packets are put
	 * in order, duplicate packets or late arrivals are discarded
	 * 
	 * If multiple packets make up a frame, these will also be organized by RTP
	 * timestamp and sequence number, and returned as a complete frame.
	 * 
	 * @param newNode
	 *            the node to add to the packet buffer
	 * @return 0 if everything is okay, -1 otherwise
	 */
	private int bufferedAddPkt(PktBufNode newNode) {
		if (RTPSession.rtpDebugLevel > 8) {
			System.out.println("<->    PktBuffer.bufferedAddPkt()");
		}
		if (length == 0) {
			// The buffer was empty, this packet is the one and only.
			newest = newNode;
			oldest = newNode;
		} else {
			// The packetbuffer is not empty.
			if (newNode.timeStamp > newest.timeStamp
					|| newNode.seqNum > newest.seqNum) {
				// Packet came in order
				newNode.nextFrameQueueNode = newest;
				newest.prevFrameQueueNode = newNode;
				newest = newNode;
			} else {
				// There are packets, we need to order this one right.
				if (!pktOnTime(newNode.timeStamp, newNode.seqNum)
						&& rtpSession.pktBufBehavior > -1) {
					// We got this too late, can't put it in order anymore.
					if (RTPSession.rtpDebugLevel > 2) {
						System.out
								.println("PktBuffer.addPkt Dropped a packet due to lag! "
										+ newNode.timeStamp
										+ " "
										+ newNode.seqNum
										+ " vs "
										+ oldest.timeStamp
										+ " "
										+ oldest.seqNum);
					}
					return -1;
				}

				// Need to do some real work, find out where it belongs (linear
				// search from the back).
				PktBufNode tmpNode = newest;
				while (tmpNode.timeStamp > newNode.timeStamp) {
					tmpNode = tmpNode.nextFrameQueueNode;
				}

				if (tmpNode.timeStamp == newNode.timeStamp
						&& rtpSession.frameReconstruction
						&& newNode.seqNum != tmpNode.seqNum) {
					// Packet has same timestamp, presumably belongs to frame.
					// Need to order within frame.
					if (RTPSession.rtpDebugLevel > 8) {
						System.out
								.println("Found pkt with existing timeStamp: "
										+ newNode.timeStamp);
					}
					int ret = addToFrame(tmpNode, newNode);
					if (ret != 0) {
						return ret;
					}
				} else {

					// Check that it's not a duplicate
					if (tmpNode.timeStamp == newNode.timeStamp
							&& newNode.seqNum == tmpNode.seqNum) {
						if (RTPSession.rtpDebugLevel > 2) {
							System.out
									.println("PktBuffer.addPkt Dropped a duplicate packet! "
											+ newNode.timeStamp
											+ " "
											+ newNode.seqNum);
						}
						return -1;
					}

					// Insert into buffer
					newNode.nextFrameQueueNode = tmpNode;
					newNode.prevFrameQueueNode = tmpNode.prevFrameQueueNode;

					// Update the node behind
					if (newNode.prevFrameQueueNode != null) {
						newNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
					}
					tmpNode.prevFrameQueueNode = newNode;

					if (newNode.timeStamp > newest.timeStamp) {
						newest = newNode;
					}
				}
			}
		}
		// Update the length of this buffer
		length++;
		return 0;
	}

	/**
	 * 
	 * @param frameNode
	 *            the node currently representing the frame in the packet buffer
	 * @param newNode
	 *            the new node to be added to the frame
	 * @return 0 if no error, -2 if this is a duplicate packet
	 */
	private int addToFrame(PktBufNode frameNode, PktBufNode newNode) {
		// Node has same timeStamp, assume pkt belongs to frame

		if (frameNode.seqNum < newNode.seqNum) {
			// this is not the first packet in the frame
			frameNode.pktCount++;

			// Find the right spot
			while (frameNode.nextFrameNode != null
					&& frameNode.nextFrameNode.seqNum < newNode.seqNum) {
				frameNode = frameNode.nextFrameNode;
			}

			// Check whether packet is duplicate
			if (frameNode.nextFrameNode != null
					&& frameNode.nextFrameNode.seqNum == newNode.seqNum) {
				if (RTPSession.rtpDebugLevel > 2) {
					System.out
							.println("PktBuffer.addPkt Dropped a duplicate packet!");
				}
				return -2;
			}

			newNode.nextFrameNode = frameNode.nextFrameNode;
			frameNode.nextFrameNode = newNode;

		} else {
			// newNode has the lowest sequence number
			newNode.nextFrameNode = frameNode;
			newNode.pktCount = frameNode.pktCount + 1;

			// Update the queue
			if (frameNode.nextFrameQueueNode != null) {
				frameNode.nextFrameQueueNode.prevFrameQueueNode = newNode;
				newNode.nextFrameQueueNode = frameNode.nextFrameQueueNode;
				frameNode.nextFrameQueueNode = null;
			}
			if (frameNode.prevFrameQueueNode != null) {
				frameNode.prevFrameQueueNode.nextFrameQueueNode = newNode;
				newNode.prevFrameQueueNode = frameNode.prevFrameQueueNode;
				frameNode.prevFrameQueueNode = null;
			}
			if (newest.timeStamp == newNode.timeStamp) {
				newest = newNode;
			}
		}

		return 0;
	}

	/**
	 * Checks the oldest frame, if there is one, sees whether it is complete.
	 * 
	 * @return Returns null if there are no complete frames available.
	 */
	protected synchronized DataFrame popOldestFrame() {
		if (RTPSession.rtpDebugLevel > 7) {
			System.out.println("-> PktBuffer.popOldestFrame()");
		}
		if (RTPSession.rtpDebugLevel > 10) {
			this.debugPrint();
		}

		if (this.rtpSession.pktBufBehavior > 0) {
			return this.bufferedPopFrame();
		} else {
			return this.unbufferedPopFrame();
		}
	}

	/**
	 * Will return the oldest frame without checking whether it is in the right
	 * order, or whether we should wate for late arrivals.
	 * 
	 * @return the first frame on the queue, null otherwise
	 */
	private DataFrame unbufferedPopFrame() {
		if (oldest != null) {
			PktBufNode retNode = oldest;

			popFrameQueueCleanup(retNode, retNode.seqNum);
			DataFrame df = DataFramePool.getInstance().borrowFrame();
			df.initDataFrame(retNode, this.p, rtpSession.appIntf
					.frameSize(oldest.pkt.getPayloadType()));
			return df;
		} else {
			return null;
		}
	}

	/**
	 * Only returns if the buffer is full, i.e. length exceeds
	 * rtpSession.pktBufBehavior, or if the next packet directly follows the
	 * previous one returned to the application.
	 * 
	 * @return first frame in order, null otherwise
	 */
	private DataFrame bufferedPopFrame() {
		PktBufNode retNode = oldest;
		/**
		 * Three scenarios: 1) There are no packets available 2) The first
		 * packet is vailable and in order 3) The first packet is not the next
		 * on in the sequence a) We have exceeded the wait buffer b) We wait
		 */
		// System.out.println(" Debug:" +(retNode != null) + " " +
		// (retNode.seqNum == this.lastSeqNumber + 1)
		// + " " + ( retNode.seqNum == 0 ) + " " + (this.length >
		// this.rtpSession.maxReorderBuffer)
		// + " " + (this.lastSeqNumber < 0));

		// Pop it off, null all references.
		if (retNode != null
				&& (retNode.seqNum == this.lastSeqNumber + 1
						|| retNode.seqNum == 0
						|| this.length > this.rtpSession.pktBufBehavior || this.lastSeqNumber < 0)) {

			// if(tmpNode.pktCount == compLen) {
			if (RTPSession.rtpDebugLevel > 7) {
				System.out
						.println("<- PktBuffer.popOldestFrame() returns frame");
			}

			DataFrame df = DataFramePool.getInstance().borrowFrame();
			df.initDataFrame(retNode, this.p, rtpSession.appIntf
					.frameSize(oldest.pkt.getPayloadType()));

			// DataFrame df = new DataFrame(retNode, this.p, 1);
			popFrameQueueCleanup(retNode, df.lastSeqNum);

			return df;

		} else {
			// If we get here we have little to show for.
			if (RTPSession.rtpDebugLevel > 2) {
				System.out
						.println("<- PktBuffer.popOldestFrame() returns null "
								+ retNode.seqNum + " " + this.lastSeqNumber);
				this.debugPrint();
			}
			return null;
		}
	}

	/**
	 * Cleans the packet buffer before returning the frame, i.e. making sure the
	 * queue has a head etc.
	 * 
	 * @param retNode
	 *            the node that is about to be popped
	 * @param highestSeq
	 *            the highest sequence number returned to the application
	 */
	private void popFrameQueueCleanup(PktBufNode retNode, int highestSeq) {
		if (1 == length) {
			// There's only one frame
			newest = null;
			oldest = null;
		} else {
			// There are more frames
			oldest = oldest.prevFrameQueueNode;
			oldest.nextFrameQueueNode = null;
		}

		// Update counters
		length--;

		// Find the highest sequence number associated with this timestamp
		this.lastSeqNumber = highestSeq;
		this.lastTimestamp = retNode.timeStamp;
	}

	/**
	 * Returns the length of the packetbuffer.
	 * 
	 * @return number of frames (complete or not) in packetbuffer.
	 */
	protected int getLength() {
		return length;
	}

	/**
	 * Checks whether a packet is not too late, i.e. the next packet has already
	 * been returned.
	 * 
	 * @param timeStamp
	 *            the RTP timestamp of the packet under consideration
	 * @param seqNum
	 *            the sequence number of the packet under consideration
	 * @return true if newer packets have not been handed to the application
	 */
	protected boolean pktOnTime(long timeStamp, int seqNum) {
		if (this.lastSeqNumber == -1) {
			// First packet
			return true;
		} else {
			if (seqNum >= this.lastSeqNumber) {
				if (this.lastSeqNumber < 3 && timeStamp < this.lastTimestamp) {
					return false;
				}
			} else {
				if (seqNum > 3 || timeStamp < this.lastTimestamp) {
					return false;
				}
			}
		}
		return true;
	}

	/**
	 * Prints out the packet buffer, oldest node first (on top).
	 */
	protected void debugPrint() {
		System.out.println("PktBuffer.debugPrint() : length " + length
				+ " SSRC " + SSRC + " lastSeqNum:" + lastSeqNumber);
		PktBufNode tmpNode = oldest;
		int i = 0;
		while (tmpNode != null) {
			// String str = tmpNode.timeStamp.toString();
			System.out.println("   " + i + " seqNum:" + tmpNode.seqNum
					+ " timeStamp: " + tmpNode.timeStamp + " pktCount:"
					+ tmpNode.pktCount);
			i++;
			tmpNode = tmpNode.prevFrameQueueNode;
		}
	}
}