src/jlibrtp/RTCPReceiverThread.java
changeset 823 2036ebfaccda
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/RTCPReceiverThread.java	Fri Nov 20 19:29:42 2009 +0100
@@ -0,0 +1,446 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+package jlibrtp;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+import org.sipdroid.net.tools.DatagramPool;
+import org.sipdroid.net.tools.GenericPool;
+
+/**
+ * This thread hangs on the RTCP socket and waits for new packets
+ * 
+ * @author Arne Kepp
+ * 
+ */
+public class RTCPReceiverThread extends Thread {
+	/** Parent RTP Session */
+	private RTPSession rtpSession = null;
+	/** Parent RTCP Session */
+	private RTCPSession rtcpSession = null;
+
+	private GenericPool<CompRtcpPkt> rtcpPacketPool;
+
+	/**
+	 * Constructor for new thread
+	 * 
+	 * @param rtcpSession
+	 *            parent RTCP session
+	 * @param rtpSession
+	 *            parent RTP session
+	 */
+	RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
+		this.rtpSession = rtpSession;
+		this.rtcpSession = rtcpSession;
+
+		rtcpPacketPool = new GenericPool<CompRtcpPkt>(10);
+
+		if (RTPSession.rtpDebugLevel > 1) {
+			System.out.println("<-> RTCPReceiverThread created");
+		}
+
+	}
+
+	/**
+	 * Find out whether a participant with this SSRC is known.
+	 * 
+	 * If the user is unknown, and the system is operating in unicast mode, try
+	 * to match the ip-address of the sender to the ip address of a previously
+	 * unmatched target
+	 * 
+	 * @param ssrc
+	 *            the SSRC of the participant
+	 * @param packet
+	 *            the packet that notified us
+	 * @return the relevant participant, possibly newly created
+	 */
+	private Participant findParticipant(long ssrc, DatagramPacket packet) {
+		Participant p = rtpSession.partDb.getParticipant(ssrc);
+		if (p == null) {
+			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
+			while (enu.hasMoreElements()) {
+				Participant tmp = (Participant) enu.nextElement();
+				if (tmp.ssrc < 0
+						&& (tmp.rtcpAddress.getAddress().equals(
+								packet.getAddress()) || tmp.rtpAddress
+								.getAddress().equals(packet.getAddress()))) {
+
+					// Best guess
+					System.out
+					.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
+							+ ssrc
+							+ " @"
+							+ packet.getAddress().toString()
+							+ ", WAS able to match it.");
+
+					tmp.ssrc = ssrc;
+					return tmp;
+				}
+			}
+			// Create an unknown sender
+			System.out
+			.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
+					+ ssrc
+					+ " @"
+					+ packet.getAddress().toString()
+					+ ", was NOT able to match it.");
+			p = new Participant((InetSocketAddress) null,
+					(InetSocketAddress) packet.getSocketAddress(), ssrc);
+			rtpSession.partDb.addParticipant(2, p);
+		}
+		return p;
+	}
+
+	/**
+	 * Parse a received UDP packet
+	 * 
+	 * Perform the header checks and extract the RTCP packets in it
+	 * 
+	 * @param packet
+	 *            the packet to be parsed
+	 * @return -1 if there was a problem, 0 if successfully parsed
+	 */
+	private int parsePacket(DatagramPacket packet) {
+
+		if (packet.getLength() % 4 != 0) {
+			if (RTPSession.rtcpDebugLevel > 2) {
+				System.out
+				.println("RTCPReceiverThread.parsePacket got packet that had length "
+						+ packet.getLength());
+			}
+			return -1;
+		} else {
+			byte[] rawPkt = packet.getData();
+
+			// Parse the received compound RTCP (?) packet
+			CompRtcpPkt compPkt = rtcpPacketPool.borrowItem();
+			compPkt.init(rawPkt, packet.getLength(),
+					(InetSocketAddress) packet.getSocketAddress(), rtpSession);
+
+			if (this.rtpSession.debugAppIntf != null) {
+				String intfStr;
+
+				if (rtpSession.mcSession) {
+					intfStr = this.rtcpSession.rtcpMCSock
+					.getLocalSocketAddress().toString();
+				} else {
+					intfStr = this.rtpSession.rtpSock.getLocalSocketAddress()
+					.toString();
+				}
+
+				if (compPkt.problem == 0) {
+					String str = new String(
+							"Received compound RTCP packet of size "
+							+ packet.getLength() + " from "
+							+ packet.getSocketAddress().toString()
+							+ " via " + intfStr + " containing "
+							+ compPkt.rtcpPkts.size() + " packets");
+
+					this.rtpSession.debugAppIntf.packetReceived(1,
+							(InetSocketAddress) packet.getSocketAddress(), str);
+				} else {
+					String str = new String(
+							"Received invalid RTCP packet of size "
+							+ packet.getLength() + " from "
+							+ packet.getSocketAddress().toString()
+							+ " via " + intfStr + ": "
+							+ this.debugErrorString(compPkt.problem));
+
+					this.rtpSession.debugAppIntf.packetReceived(-2,
+							(InetSocketAddress) packet.getSocketAddress(), str);
+				}
+			}
+
+			if (RTPSession.rtcpDebugLevel > 5) {
+				Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
+				String str = " ";
+				while (iter.hasNext()) {
+					RtcpPkt aPkt = iter.next();
+					str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
+				}
+				System.out.println("<-> RTCPReceiverThread.parsePacket() from "
+						+ packet.getSocketAddress().toString() + str);
+			}
+
+			// Loop over the information
+			Iterator iter = compPkt.rtcpPkts.iterator();
+
+			long curTime = System.currentTimeMillis();
+
+			while (iter.hasNext()) {
+				RtcpPkt aPkt = (RtcpPkt) iter.next();
+
+				// Our own packets should already have been filtered out.
+				if (aPkt.ssrc == rtpSession.ssrc) {
+					System.out
+					.println("RTCPReceiverThread() received RTCP packet"
+							+ " with conflicting SSRC from "
+							+ packet.getSocketAddress().toString());
+					rtpSession.resolveSsrcConflict();
+					return -1;
+				}
+
+				/** Receiver Reports **/
+				if (aPkt.getClass() == RtcpPktRR.class) {
+					RtcpPktRR rrPkt = (RtcpPktRR) aPkt;
+
+					Participant p = findParticipant(rrPkt.ssrc, packet);
+					p.lastRtcpPkt = curTime;
+
+					if (rtpSession.rtcpAppIntf != null) {
+						rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc,
+								rrPkt.reporteeSsrc, rrPkt.lossFraction,
+								rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
+								rrPkt.interArvJitter, rrPkt.timeStampLSR,
+								rrPkt.delaySR);
+					}
+
+					/** Sender Reports **/
+				} else if (aPkt.getClass() == RtcpPktSR.class) {
+					RtcpPktSR srPkt = (RtcpPktSR) aPkt;
+
+					Participant p = findParticipant(srPkt.ssrc, packet);
+					p.lastRtcpPkt = curTime;
+
+					if (p != null) {
+
+						if (p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
+							// Calculate gradient NTP vs RTP
+							long newTime = StaticProcs.undoNtpMess(
+									srPkt.ntpTs1, srPkt.ntpTs2);
+							p.ntpGradient = ((double) (newTime - p.ntpOffset))
+							/ ((double) srPkt.rtpTs - p.lastSRRtpTs);
+							if (RTPSession.rtcpDebugLevel > 4) {
+								System.out
+								.println("RTCPReceiverThread calculated NTP vs RTP gradient: "
+										+ Double
+										.toString(p.ntpGradient));
+							}
+						} else {
+							// Calculate sum of ntpTs1 and ntpTs2 in
+							// milliseconds
+							p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1,
+									srPkt.ntpTs2);
+							p.lastNtpTs1 = srPkt.ntpTs1;
+							p.lastNtpTs2 = srPkt.ntpTs2;
+							p.lastSRRtpTs = srPkt.rtpTs;
+						}
+
+						// For the next RR
+						p.timeReceivedLSR = curTime;
+						p.setTimeStampLSR(srPkt.ntpTs1, srPkt.ntpTs2);
+
+					}
+
+					if (rtpSession.rtcpAppIntf != null) {
+						if (srPkt.rReports != null) {
+							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
+									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
+									srPkt.sendersPktCount,
+									srPkt.sendersPktCount,
+									srPkt.rReports.reporteeSsrc,
+									srPkt.rReports.lossFraction,
+									srPkt.rReports.lostPktCount,
+									srPkt.rReports.extHighSeqRecv,
+									srPkt.rReports.interArvJitter,
+									srPkt.rReports.timeStampLSR,
+									srPkt.rReports.delaySR);
+						} else {
+							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
+									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
+									srPkt.sendersPktCount,
+									srPkt.sendersPktCount, null, null, null,
+									null, null, null, null);
+						}
+					}
+
+					/** Source Descriptions **/
+				} else if (aPkt.getClass() == RtcpPktSDES.class) {
+					RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;
+
+					// The the participant database is updated
+					// when the SDES packet is reconstructed by CompRtcpPkt
+					if (rtpSession.rtcpAppIntf != null) {
+						rtpSession.rtcpAppIntf
+						.SDESPktReceived(sdesPkt.participants);
+					}
+
+					/** Bye Packets **/
+				} else if (aPkt.getClass() == RtcpPktBYE.class) {
+					RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;
+
+					long time = System.currentTimeMillis();
+					Participant[] partArray = new Participant[byePkt.ssrcArray.length];
+
+					for (int i = 0; i < byePkt.ssrcArray.length; i++) {
+						partArray[i] = rtpSession.partDb
+						.getParticipant(byePkt.ssrcArray[i]);
+						if (partArray[i] != null)
+							partArray[i].timestampBYE = time;
+					}
+
+					if (rtpSession.rtcpAppIntf != null) {
+						rtpSession.rtcpAppIntf.BYEPktReceived(partArray,
+								new String(byePkt.reason));
+					}
+
+					/** Application specific Packets **/
+				} else if (aPkt.getClass() == RtcpPktAPP.class) {
+					RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;
+
+					Participant part = findParticipant(appPkt.ssrc, packet);
+
+					if (rtpSession.rtcpAppIntf != null) {
+						rtpSession.rtcpAppIntf.APPPktReceived(part,
+								appPkt.itemCount, appPkt.pktName,
+								appPkt.pktData);
+					}
+				}
+
+			}
+		}
+		return 0;
+	}
+
+	/**
+	 * Returns a legible message when an error occurs
+	 * 
+	 * @param errorCode
+	 *            the internal error code, commonly negative of packet type
+	 * @return a string that is hopefully somewhat informative
+	 */
+	private String debugErrorString(int errorCode) {
+		String aStr = "";
+		switch (errorCode) {
+		case -1:
+			aStr = "The first packet was not of type SR or RR.";
+			break;
+		case -2:
+			aStr = "The padding bit was set for the first packet.";
+			break;
+		case -200:
+			aStr = " Error parsing Sender Report packet.";
+			break;
+		case -201:
+			aStr = " Error parsing Receiver Report packet.";
+			break;
+		case -202:
+			aStr = " Error parsing SDES packet";
+			break;
+		case -203:
+			aStr = " Error parsing BYE packet.";
+			break;
+		case -204:
+			aStr = " Error parsing Application specific packet.";
+			break;
+		case -205:
+			aStr = " Error parsing RTP Feedback packet.";
+			break;
+		case -206:
+			aStr = " Error parsing Payload-Specific Feedback packet.";
+			break;
+		default:
+			aStr = "Unknown error code " + errorCode + ".";
+		}
+
+		return aStr;
+	}
+
+	/**
+	 * Start the RTCP receiver thread.
+	 * 
+	 * It will 1) run when it receives a packet 2) parse the packet 3) call any
+	 * relevant callback functions, update database 4) block until the next one
+	 * arrives.
+	 */
+	public void run() {
+		if (RTPSession.rtcpDebugLevel > 1) {
+			if (rtpSession.mcSession) {
+				System.out
+				.println("-> RTCPReceiverThread.run() starting on MC "
+						+ rtcpSession.rtcpMCSock.getLocalPort());
+			} else {
+				System.out.println("-> RTCPReceiverThread.run() starting on "
+						+ rtcpSession.rtcpSock.getLocalPort());
+			}
+		}
+
+		while (!rtpSession.endSession) {
+
+			if (RTPSession.rtcpDebugLevel > 4) {
+				if (rtpSession.mcSession) {
+					System.out
+					.println("-> RTCPReceiverThread.run() waiting for packet on MC "
+							+ rtcpSession.rtcpMCSock.getLocalPort());
+				} else {
+					System.out
+					.println("-> RTCPReceiverThread.run() waiting for packet on "
+							+ rtcpSession.rtcpSock.getLocalPort());
+				}
+			}
+
+			// Prepare a packet
+			DatagramPacket packet = DatagramPool.getInstance().borrowPacket();
+			
+			// Wait for it to arrive
+			if (!rtpSession.mcSession) {
+				// Unicast
+				try {
+					rtcpSession.rtcpSock.receive(packet);
+				} catch (IOException e) {
+					if (!rtpSession.endSession) {
+						e.printStackTrace();
+					} else {
+						continue;
+					}
+				}
+			} else {
+				// Multicast
+				try {
+					rtcpSession.rtcpMCSock.receive(packet);
+				} catch (IOException e) {
+					if (!rtpSession.endSession) {
+						e.printStackTrace();
+					} else {
+						continue;
+					}
+				}
+			}
+
+			// Check whether this is one of our own
+			if ((rtpSession.mcSession && !packet.getSocketAddress().equals(
+					rtcpSession.rtcpMCSock))
+					|| !packet.getSocketAddress().equals(rtcpSession.rtcpSock)) {
+				// System.out.println("Packet received from: " +
+				// packet.getSocketAddress().toString());
+				parsePacket(packet);
+				// rtpSession.partDb.debugPrint();
+			}
+		}
+
+		if (RTPSession.rtcpDebugLevel > 1) {
+			System.out.println("<-> RTCPReceiverThread terminating");
+		}
+	}
+
+}