--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/RTCPReceiverThread.java Sat Mar 14 22:15:41 2009 +0100
@@ -0,0 +1,369 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+package jlibrtp;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+/**
+ * This thread hangs on the RTCP socket and waits for new packets
+ *
+ * @author Arne Kepp
+ *
+ */
+public class RTCPReceiverThread extends Thread {
+ /** Parent RTP Session */
+ private RTPSession rtpSession = null;
+ /** Parent RTCP Session */
+ private RTCPSession rtcpSession = null;
+
+ /**
+ * Constructor for new thread
+ * @param rtcpSession parent RTCP session
+ * @param rtpSession parent RTP session
+ */
+ RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
+ this.rtpSession = rtpSession;
+ this.rtcpSession = rtcpSession;
+
+ if(RTPSession.rtpDebugLevel > 1) {
+ System.out.println("<-> RTCPReceiverThread created");
+ }
+
+ }
+
+ /**
+ * Find out whether a participant with this SSRC is known.
+ *
+ * If the user is unknown, and the system is operating in unicast mode,
+ * try to match the ip-address of the sender to the ip address of a
+ * previously unmatched target
+ *
+ * @param ssrc the SSRC of the participant
+ * @param packet the packet that notified us
+ * @return the relevant participant, possibly newly created
+ */
+ private Participant findParticipant(long ssrc, DatagramPacket packet) {
+ Participant p = rtpSession.partDb.getParticipant(ssrc);
+ if(p == null) {
+ Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
+ while(enu.hasMoreElements()) {
+ Participant tmp = (Participant) enu.nextElement();
+ if(tmp.ssrc < 0 &&
+ (tmp.rtcpAddress.getAddress().equals(packet.getAddress())
+ || tmp.rtpAddress.getAddress().equals(packet.getAddress()))) {
+
+ // Best guess
+ System.out.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
+ + ssrc + " @" + packet.getAddress().toString() + ", WAS able to match it." );
+
+ tmp.ssrc = ssrc;
+ return tmp;
+ }
+ }
+ // Create an unknown sender
+ System.out.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
+ + ssrc + " @" + packet.getAddress().toString() + ", was NOT able to match it." );
+ p = new Participant((InetSocketAddress) null, (InetSocketAddress) packet.getSocketAddress(), ssrc);
+ rtpSession.partDb.addParticipant(2,p);
+ }
+ return p;
+ }
+
+
+ /**
+ * Parse a received UDP packet
+ *
+ * Perform the header checks and extract the RTCP packets in it
+ *
+ * @param packet the packet to be parsed
+ * @return -1 if there was a problem, 0 if successfully parsed
+ */
+ private int parsePacket(DatagramPacket packet) {
+
+ if(packet.getLength() % 4 != 0) {
+ if(RTPSession.rtcpDebugLevel > 2) {
+ System.out.println("RTCPReceiverThread.parsePacket got packet that had length " + packet.getLength());
+ }
+ return -1;
+ } else {
+ byte[] rawPkt = packet.getData();
+
+ // Parse the received compound RTCP (?) packet
+ CompRtcpPkt compPkt = new CompRtcpPkt(rawPkt, packet.getLength(),
+ (InetSocketAddress) packet.getSocketAddress(), rtpSession);
+
+ if(this.rtpSession.debugAppIntf != null) {
+ String intfStr;
+
+ if(rtpSession.mcSession) {
+ intfStr = this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString();
+ } else {
+ intfStr = this.rtpSession.rtpSock.getLocalSocketAddress().toString();
+ }
+
+ if( compPkt.problem == 0) {
+ String str = new String("Received compound RTCP packet of size " + packet.getLength() +
+ " from " + packet.getSocketAddress().toString() + " via " + intfStr
+ + " containing " + compPkt.rtcpPkts.size() + " packets" );
+
+ this.rtpSession.debugAppIntf.packetReceived(1,
+ (InetSocketAddress) packet.getSocketAddress(), str);
+ } else {
+ String str = new String("Received invalid RTCP packet of size " + packet.getLength() +
+ " from " + packet.getSocketAddress().toString() + " via " + intfStr
+ + ": " + this.debugErrorString(compPkt.problem) );
+
+ this.rtpSession.debugAppIntf.packetReceived(-2,
+ (InetSocketAddress) packet.getSocketAddress(), str);
+ }
+ }
+
+ if(RTPSession.rtcpDebugLevel > 5) {
+ Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
+ String str = " ";
+ while(iter.hasNext()) {
+ RtcpPkt aPkt = iter.next();
+ str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
+ }
+ System.out.println("<-> RTCPReceiverThread.parsePacket() from " + packet.getSocketAddress().toString() + str);
+ }
+
+
+ //Loop over the information
+ Iterator iter = compPkt.rtcpPkts.iterator();
+
+ long curTime = System.currentTimeMillis();
+
+ while(iter.hasNext()) {
+ RtcpPkt aPkt = (RtcpPkt) iter.next();
+
+ // Our own packets should already have been filtered out.
+ if(aPkt.ssrc == rtpSession.ssrc) {
+ System.out.println("RTCPReceiverThread() received RTCP packet"
+ + " with conflicting SSRC from " + packet.getSocketAddress().toString());
+ rtpSession.resolveSsrcConflict();
+ return -1;
+ }
+
+ /** Receiver Reports **/
+ if( aPkt.getClass() == RtcpPktRR.class) {
+ RtcpPktRR rrPkt = (RtcpPktRR) aPkt;
+
+ Participant p = findParticipant(rrPkt.ssrc, packet);
+ p.lastRtcpPkt = curTime;
+
+ if(rtpSession.rtcpAppIntf != null) {
+ rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc, rrPkt.reporteeSsrc,
+ rrPkt.lossFraction, rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
+ rrPkt.interArvJitter, rrPkt.timeStampLSR, rrPkt.delaySR);
+ }
+
+ /** Sender Reports **/
+ } else if(aPkt.getClass() == RtcpPktSR.class) {
+ RtcpPktSR srPkt = (RtcpPktSR) aPkt;
+
+ Participant p = findParticipant(srPkt.ssrc, packet);
+ p.lastRtcpPkt = curTime;
+
+ if(p != null) {
+
+ if(p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
+ //Calculate gradient NTP vs RTP
+ long newTime = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
+ p.ntpGradient = ((double) (newTime - p.ntpOffset))/((double) srPkt.rtpTs - p.lastSRRtpTs);
+ if(RTPSession.rtcpDebugLevel > 4) {
+ System.out.println("RTCPReceiverThread calculated NTP vs RTP gradient: " + Double.toString(p.ntpGradient));
+ }
+ } else {
+ // Calculate sum of ntpTs1 and ntpTs2 in milliseconds
+ p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
+ p.lastNtpTs1 = srPkt.ntpTs1;
+ p.lastNtpTs2 = srPkt.ntpTs2;
+ p.lastSRRtpTs = srPkt.rtpTs;
+ }
+
+ // For the next RR
+ p.timeReceivedLSR = curTime;
+ p.setTimeStampLSR(srPkt.ntpTs1,srPkt.ntpTs2);
+
+ }
+
+
+ if(rtpSession.rtcpAppIntf != null) {
+ if(srPkt.rReports != null) {
+ rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2,
+ srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersPktCount,
+ srPkt.rReports.reporteeSsrc, srPkt.rReports.lossFraction, srPkt.rReports.lostPktCount,
+ srPkt.rReports.extHighSeqRecv, srPkt.rReports.interArvJitter, srPkt.rReports.timeStampLSR,
+ srPkt.rReports.delaySR);
+ } else {
+ rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2,
+ srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersPktCount,
+ null, null, null,
+ null, null, null,
+ null);
+ }
+ }
+
+ /** Source Descriptions **/
+ } else if(aPkt.getClass() == RtcpPktSDES.class) {
+ RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;
+
+ // The the participant database is updated
+ // when the SDES packet is reconstructed by CompRtcpPkt
+ if(rtpSession.rtcpAppIntf != null) {
+ rtpSession.rtcpAppIntf.SDESPktReceived(sdesPkt.participants);
+ }
+
+ /** Bye Packets **/
+ } else if(aPkt.getClass() == RtcpPktBYE.class) {
+ RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;
+
+ long time = System.currentTimeMillis();
+ Participant[] partArray = new Participant[byePkt.ssrcArray.length];
+
+ for(int i=0; i<byePkt.ssrcArray.length; i++) {
+ partArray[i] = rtpSession.partDb.getParticipant(byePkt.ssrcArray[i]);
+ if(partArray[i] != null)
+ partArray[i].timestampBYE = time;
+ }
+
+ if(rtpSession.rtcpAppIntf != null) {
+ rtpSession.rtcpAppIntf.BYEPktReceived(partArray, new String(byePkt.reason));
+ }
+
+ /** Application specific Packets **/
+ } else if(aPkt.getClass() == RtcpPktAPP.class) {
+ RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;
+
+ Participant part = findParticipant(appPkt.ssrc, packet);
+
+ if(rtpSession.rtcpAppIntf != null) {
+ rtpSession.rtcpAppIntf.APPPktReceived(part, appPkt.itemCount, appPkt.pktName, appPkt.pktData);
+ }
+ }
+
+
+
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Returns a legible message when an error occurs
+ *
+ * @param errorCode the internal error code, commonly negative of packet type
+ * @return a string that is hopefully somewhat informative
+ */
+ private String debugErrorString(int errorCode) {
+ String aStr = "";
+ switch(errorCode) {
+ case -1: aStr = "The first packet was not of type SR or RR."; break;
+ case -2: aStr = "The padding bit was set for the first packet."; break;
+ case -200: aStr = " Error parsing Sender Report packet."; break;
+ case -201: aStr = " Error parsing Receiver Report packet."; break;
+ case -202: aStr = " Error parsing SDES packet"; break;
+ case -203: aStr = " Error parsing BYE packet."; break;
+ case -204: aStr = " Error parsing Application specific packet."; break;
+ case -205: aStr = " Error parsing RTP Feedback packet."; break;
+ case -206: aStr = " Error parsing Payload-Specific Feedback packet."; break;
+ default:
+ aStr = "Unknown error code " + errorCode + ".";
+ }
+
+ return aStr;
+ }
+
+ /**
+ * Start the RTCP receiver thread.
+ *
+ * It will
+ * 1) run when it receives a packet
+ * 2) parse the packet
+ * 3) call any relevant callback functions, update database
+ * 4) block until the next one arrives.
+ */
+ public void run() {
+ if(RTPSession.rtcpDebugLevel > 1) {
+ if(rtpSession.mcSession) {
+ System.out.println("-> RTCPReceiverThread.run() starting on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
+ } else {
+ System.out.println("-> RTCPReceiverThread.run() starting on " + rtcpSession.rtcpSock.getLocalPort() );
+ }
+ }
+
+ while(!rtpSession.endSession) {
+
+ if(RTPSession.rtcpDebugLevel > 4) {
+ if(rtpSession.mcSession) {
+ System.out.println("-> RTCPReceiverThread.run() waiting for packet on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
+ } else {
+ System.out.println("-> RTCPReceiverThread.run() waiting for packet on " + rtcpSession.rtcpSock.getLocalPort() );
+ }
+ }
+
+ // Prepare a packet
+ byte[] rawPkt = new byte[1500];
+ DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);
+
+ // Wait for it to arrive
+ if(! rtpSession.mcSession) {
+ //Unicast
+ try {
+ rtcpSession.rtcpSock.receive(packet);
+ } catch (IOException e) {
+ if(!rtpSession.endSession) {
+ e.printStackTrace();
+ } else {
+ continue;
+ }
+ }
+ } else {
+ //Multicast
+ try {
+ rtcpSession.rtcpMCSock.receive(packet);
+ } catch (IOException e) {
+ if(!rtpSession.endSession) {
+ e.printStackTrace();
+ } else {
+ continue;
+ }
+ }
+ }
+
+ // Check whether this is one of our own
+ if( (rtpSession.mcSession && ! packet.getSocketAddress().equals(rtcpSession.rtcpMCSock) )
+ || ! packet.getSocketAddress().equals(rtcpSession.rtcpSock) ) {
+ //System.out.println("Packet received from: " + packet.getSocketAddress().toString());
+ parsePacket(packet);
+ //rtpSession.partDb.debugPrint();
+ }
+ }
+
+ if(RTPSession.rtcpDebugLevel > 1) {
+ System.out.println("<-> RTCPReceiverThread terminating");
+ }
+ }
+
+}