src/jlibrtp/RTCPReceiverThread.java
author nikita@nikita-laptop
Sat, 23 Jan 2010 21:48:58 +0100
changeset 833 f5a5d9237d69
parent 823 2036ebfaccda
permissions -rw-r--r--
remove some unused files

/**
 * Java RTP Library (jlibrtp)
 * Copyright (C) 2006 Arne Kepp
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package jlibrtp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.Enumeration;
import java.util.Iterator;

import org.sipdroid.net.tools.DatagramPool;
import org.sipdroid.net.tools.GenericPool;

/**
 * This thread hangs on the RTCP socket and waits for new packets
 * 
 * @author Arne Kepp
 * 
 */
public class RTCPReceiverThread extends Thread {
	/** Parent RTP Session */
	private RTPSession rtpSession = null;
	/** Parent RTCP Session */
	private RTCPSession rtcpSession = null;

	private GenericPool<CompRtcpPkt> rtcpPacketPool;

	/**
	 * Constructor for new thread
	 * 
	 * @param rtcpSession
	 *            parent RTCP session
	 * @param rtpSession
	 *            parent RTP session
	 */
	RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
		this.rtpSession = rtpSession;
		this.rtcpSession = rtcpSession;

		rtcpPacketPool = new GenericPool<CompRtcpPkt>(10);

		if (RTPSession.rtpDebugLevel > 1) {
			System.out.println("<-> RTCPReceiverThread created");
		}

	}

	/**
	 * Find out whether a participant with this SSRC is known.
	 * 
	 * If the user is unknown, and the system is operating in unicast mode, try
	 * to match the ip-address of the sender to the ip address of a previously
	 * unmatched target
	 * 
	 * @param ssrc
	 *            the SSRC of the participant
	 * @param packet
	 *            the packet that notified us
	 * @return the relevant participant, possibly newly created
	 */
	private Participant findParticipant(long ssrc, DatagramPacket packet) {
		Participant p = rtpSession.partDb.getParticipant(ssrc);
		if (p == null) {
			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
			while (enu.hasMoreElements()) {
				Participant tmp = (Participant) enu.nextElement();
				if (tmp.ssrc < 0
						&& (tmp.rtcpAddress.getAddress().equals(
								packet.getAddress()) || tmp.rtpAddress
								.getAddress().equals(packet.getAddress()))) {

					// Best guess
					System.out
					.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
							+ ssrc
							+ " @"
							+ packet.getAddress().toString()
							+ ", WAS able to match it.");

					tmp.ssrc = ssrc;
					return tmp;
				}
			}
			// Create an unknown sender
			System.out
			.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
					+ ssrc
					+ " @"
					+ packet.getAddress().toString()
					+ ", was NOT able to match it.");
			p = new Participant((InetSocketAddress) null,
					(InetSocketAddress) packet.getSocketAddress(), ssrc);
			rtpSession.partDb.addParticipant(2, p);
		}
		return p;
	}

	/**
	 * Parse a received UDP packet
	 * 
	 * Perform the header checks and extract the RTCP packets in it
	 * 
	 * @param packet
	 *            the packet to be parsed
	 * @return -1 if there was a problem, 0 if successfully parsed
	 */
	private int parsePacket(DatagramPacket packet) {

		if (packet.getLength() % 4 != 0) {
			if (RTPSession.rtcpDebugLevel > 2) {
				System.out
				.println("RTCPReceiverThread.parsePacket got packet that had length "
						+ packet.getLength());
			}
			return -1;
		} else {
			byte[] rawPkt = packet.getData();

			// Parse the received compound RTCP (?) packet
			CompRtcpPkt compPkt = rtcpPacketPool.borrowItem();
			compPkt.init(rawPkt, packet.getLength(),
					(InetSocketAddress) packet.getSocketAddress(), rtpSession);

			if (this.rtpSession.debugAppIntf != null) {
				String intfStr;

				if (rtpSession.mcSession) {
					intfStr = this.rtcpSession.rtcpMCSock
					.getLocalSocketAddress().toString();
				} else {
					intfStr = this.rtpSession.rtpSock.getLocalSocketAddress()
					.toString();
				}

				if (compPkt.problem == 0) {
					String str = new String(
							"Received compound RTCP packet of size "
							+ packet.getLength() + " from "
							+ packet.getSocketAddress().toString()
							+ " via " + intfStr + " containing "
							+ compPkt.rtcpPkts.size() + " packets");

					this.rtpSession.debugAppIntf.packetReceived(1,
							(InetSocketAddress) packet.getSocketAddress(), str);
				} else {
					String str = new String(
							"Received invalid RTCP packet of size "
							+ packet.getLength() + " from "
							+ packet.getSocketAddress().toString()
							+ " via " + intfStr + ": "
							+ this.debugErrorString(compPkt.problem));

					this.rtpSession.debugAppIntf.packetReceived(-2,
							(InetSocketAddress) packet.getSocketAddress(), str);
				}
			}

			if (RTPSession.rtcpDebugLevel > 5) {
				Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
				String str = " ";
				while (iter.hasNext()) {
					RtcpPkt aPkt = iter.next();
					str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
				}
				System.out.println("<-> RTCPReceiverThread.parsePacket() from "
						+ packet.getSocketAddress().toString() + str);
			}

			// Loop over the information
			Iterator iter = compPkt.rtcpPkts.iterator();

			long curTime = System.currentTimeMillis();

			while (iter.hasNext()) {
				RtcpPkt aPkt = (RtcpPkt) iter.next();

				// Our own packets should already have been filtered out.
				if (aPkt.ssrc == rtpSession.ssrc) {
					System.out
					.println("RTCPReceiverThread() received RTCP packet"
							+ " with conflicting SSRC from "
							+ packet.getSocketAddress().toString());
					rtpSession.resolveSsrcConflict();
					return -1;
				}

				/** Receiver Reports **/
				if (aPkt.getClass() == RtcpPktRR.class) {
					RtcpPktRR rrPkt = (RtcpPktRR) aPkt;

					Participant p = findParticipant(rrPkt.ssrc, packet);
					p.lastRtcpPkt = curTime;

					if (rtpSession.rtcpAppIntf != null) {
						rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc,
								rrPkt.reporteeSsrc, rrPkt.lossFraction,
								rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
								rrPkt.interArvJitter, rrPkt.timeStampLSR,
								rrPkt.delaySR);
					}

					/** Sender Reports **/
				} else if (aPkt.getClass() == RtcpPktSR.class) {
					RtcpPktSR srPkt = (RtcpPktSR) aPkt;

					Participant p = findParticipant(srPkt.ssrc, packet);
					p.lastRtcpPkt = curTime;

					if (p != null) {

						if (p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
							// Calculate gradient NTP vs RTP
							long newTime = StaticProcs.undoNtpMess(
									srPkt.ntpTs1, srPkt.ntpTs2);
							p.ntpGradient = ((double) (newTime - p.ntpOffset))
							/ ((double) srPkt.rtpTs - p.lastSRRtpTs);
							if (RTPSession.rtcpDebugLevel > 4) {
								System.out
								.println("RTCPReceiverThread calculated NTP vs RTP gradient: "
										+ Double
										.toString(p.ntpGradient));
							}
						} else {
							// Calculate sum of ntpTs1 and ntpTs2 in
							// milliseconds
							p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1,
									srPkt.ntpTs2);
							p.lastNtpTs1 = srPkt.ntpTs1;
							p.lastNtpTs2 = srPkt.ntpTs2;
							p.lastSRRtpTs = srPkt.rtpTs;
						}

						// For the next RR
						p.timeReceivedLSR = curTime;
						p.setTimeStampLSR(srPkt.ntpTs1, srPkt.ntpTs2);

					}

					if (rtpSession.rtcpAppIntf != null) {
						if (srPkt.rReports != null) {
							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
									srPkt.sendersPktCount,
									srPkt.sendersPktCount,
									srPkt.rReports.reporteeSsrc,
									srPkt.rReports.lossFraction,
									srPkt.rReports.lostPktCount,
									srPkt.rReports.extHighSeqRecv,
									srPkt.rReports.interArvJitter,
									srPkt.rReports.timeStampLSR,
									srPkt.rReports.delaySR);
						} else {
							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
									srPkt.sendersPktCount,
									srPkt.sendersPktCount, null, null, null,
									null, null, null, null);
						}
					}

					/** Source Descriptions **/
				} else if (aPkt.getClass() == RtcpPktSDES.class) {
					RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;

					// The the participant database is updated
					// when the SDES packet is reconstructed by CompRtcpPkt
					if (rtpSession.rtcpAppIntf != null) {
						rtpSession.rtcpAppIntf
						.SDESPktReceived(sdesPkt.participants);
					}

					/** Bye Packets **/
				} else if (aPkt.getClass() == RtcpPktBYE.class) {
					RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;

					long time = System.currentTimeMillis();
					Participant[] partArray = new Participant[byePkt.ssrcArray.length];

					for (int i = 0; i < byePkt.ssrcArray.length; i++) {
						partArray[i] = rtpSession.partDb
						.getParticipant(byePkt.ssrcArray[i]);
						if (partArray[i] != null)
							partArray[i].timestampBYE = time;
					}

					if (rtpSession.rtcpAppIntf != null) {
						rtpSession.rtcpAppIntf.BYEPktReceived(partArray,
								new String(byePkt.reason));
					}

					/** Application specific Packets **/
				} else if (aPkt.getClass() == RtcpPktAPP.class) {
					RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;

					Participant part = findParticipant(appPkt.ssrc, packet);

					if (rtpSession.rtcpAppIntf != null) {
						rtpSession.rtcpAppIntf.APPPktReceived(part,
								appPkt.itemCount, appPkt.pktName,
								appPkt.pktData);
					}
				}

			}
		}
		return 0;
	}

	/**
	 * Returns a legible message when an error occurs
	 * 
	 * @param errorCode
	 *            the internal error code, commonly negative of packet type
	 * @return a string that is hopefully somewhat informative
	 */
	private String debugErrorString(int errorCode) {
		String aStr = "";
		switch (errorCode) {
		case -1:
			aStr = "The first packet was not of type SR or RR.";
			break;
		case -2:
			aStr = "The padding bit was set for the first packet.";
			break;
		case -200:
			aStr = " Error parsing Sender Report packet.";
			break;
		case -201:
			aStr = " Error parsing Receiver Report packet.";
			break;
		case -202:
			aStr = " Error parsing SDES packet";
			break;
		case -203:
			aStr = " Error parsing BYE packet.";
			break;
		case -204:
			aStr = " Error parsing Application specific packet.";
			break;
		case -205:
			aStr = " Error parsing RTP Feedback packet.";
			break;
		case -206:
			aStr = " Error parsing Payload-Specific Feedback packet.";
			break;
		default:
			aStr = "Unknown error code " + errorCode + ".";
		}

		return aStr;
	}

	/**
	 * Start the RTCP receiver thread.
	 * 
	 * It will 1) run when it receives a packet 2) parse the packet 3) call any
	 * relevant callback functions, update database 4) block until the next one
	 * arrives.
	 */
	public void run() {
		if (RTPSession.rtcpDebugLevel > 1) {
			if (rtpSession.mcSession) {
				System.out
				.println("-> RTCPReceiverThread.run() starting on MC "
						+ rtcpSession.rtcpMCSock.getLocalPort());
			} else {
				System.out.println("-> RTCPReceiverThread.run() starting on "
						+ rtcpSession.rtcpSock.getLocalPort());
			}
		}

		while (!rtpSession.endSession) {

			if (RTPSession.rtcpDebugLevel > 4) {
				if (rtpSession.mcSession) {
					System.out
					.println("-> RTCPReceiverThread.run() waiting for packet on MC "
							+ rtcpSession.rtcpMCSock.getLocalPort());
				} else {
					System.out
					.println("-> RTCPReceiverThread.run() waiting for packet on "
							+ rtcpSession.rtcpSock.getLocalPort());
				}
			}

			// Prepare a packet
			DatagramPacket packet = DatagramPool.getInstance().borrowPacket();
			
			// Wait for it to arrive
			if (!rtpSession.mcSession) {
				// Unicast
				try {
					rtcpSession.rtcpSock.receive(packet);
				} catch (IOException e) {
					if (!rtpSession.endSession) {
						e.printStackTrace();
					} else {
						continue;
					}
				}
			} else {
				// Multicast
				try {
					rtcpSession.rtcpMCSock.receive(packet);
				} catch (IOException e) {
					if (!rtpSession.endSession) {
						e.printStackTrace();
					} else {
						continue;
					}
				}
			}

			// Check whether this is one of our own
			if ((rtpSession.mcSession && !packet.getSocketAddress().equals(
					rtcpSession.rtcpMCSock))
					|| !packet.getSocketAddress().equals(rtcpSession.rtcpSock)) {
				// System.out.println("Packet received from: " +
				// packet.getSocketAddress().toString());
				parsePacket(packet);
				// rtpSession.partDb.debugPrint();
			}
		}

		if (RTPSession.rtcpDebugLevel > 1) {
			System.out.println("<-> RTCPReceiverThread terminating");
		}
	}

}