src/jlibrtp/RTCPReceiverThread.java
changeset 214 2bf440c54ca5
parent 213 9bdff6cbd120
child 215 5db64229be69
equal deleted inserted replaced
213:9bdff6cbd120 214:2bf440c54ca5
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.io.IOException;
       
    22 import java.net.DatagramPacket;
       
    23 import java.net.InetSocketAddress;
       
    24 import java.util.Enumeration;
       
    25 import java.util.Iterator;
       
    26 
       
    27 /**
       
    28  * This thread hangs on the RTCP socket and waits for new packets
       
    29  * 
       
    30  * @author Arne Kepp
       
    31  *
       
    32  */
       
    33 public class RTCPReceiverThread extends Thread {
       
    34 	/** Parent RTP Session */
       
    35 	private RTPSession rtpSession = null;
       
    36 	/** Parent RTCP Session */
       
    37 	private RTCPSession rtcpSession = null;
       
    38 	
       
    39 	/**
       
    40 	 * Constructor for new thread
       
    41 	 * @param rtcpSession parent RTCP session
       
    42 	 * @param rtpSession parent RTP session
       
    43 	 */
       
    44 	RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
       
    45 		this.rtpSession = rtpSession;
       
    46 		this.rtcpSession = rtcpSession;
       
    47 		
       
    48 		if(RTPSession.rtpDebugLevel > 1) {
       
    49 			System.out.println("<-> RTCPReceiverThread created");
       
    50 		} 
       
    51 
       
    52 	}
       
    53 	
       
    54 	/**
       
    55 	 * Find out whether a participant with this SSRC is known.
       
    56 	 * 
       
    57 	 * If the user is unknown, and the system is operating in unicast mode,
       
    58 	 * try to match the ip-address of the sender to the ip address of a
       
    59 	 * previously unmatched target
       
    60 	 * 
       
    61 	 * @param ssrc the SSRC of the participant
       
    62 	 * @param packet the packet that notified us
       
    63 	 * @return the relevant participant, possibly newly created
       
    64 	 */
       
    65 	private Participant findParticipant(long ssrc, DatagramPacket packet) {
       
    66 		Participant p = rtpSession.partDb.getParticipant(ssrc);
       
    67 		if(p == null) {
       
    68 			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
       
    69 			while(enu.hasMoreElements()) {
       
    70 				Participant tmp = (Participant) enu.nextElement();
       
    71 				if(tmp.ssrc < 0 && 
       
    72 						(tmp.rtcpAddress.getAddress().equals(packet.getAddress())
       
    73 						|| tmp.rtpAddress.getAddress().equals(packet.getAddress()))) {
       
    74 					
       
    75 					// Best guess
       
    76 					System.out.println("RTCPReceiverThread: Got an unexpected packet from SSRC:" 
       
    77 							+ ssrc  + " @" + packet.getAddress().toString() + ", WAS able to match it." );
       
    78 					
       
    79 					tmp.ssrc = ssrc;
       
    80 					return tmp;
       
    81 				}
       
    82 			}
       
    83 			// Create an unknown sender
       
    84 			System.out.println("RTCPReceiverThread: Got an unexpected packet from SSRC:" 
       
    85 					+ ssrc  + " @" + packet.getAddress().toString() + ", was NOT able to match it." );
       
    86 			p = new Participant((InetSocketAddress) null, (InetSocketAddress) packet.getSocketAddress(), ssrc);
       
    87 			rtpSession.partDb.addParticipant(2,p);
       
    88 		}
       
    89 		return p;
       
    90 	}
       
    91 	
       
    92 	
       
    93 	/**
       
    94 	 * Parse a received UDP packet
       
    95 	 * 
       
    96 	 * Perform the header checks and extract the RTCP packets in it
       
    97 	 * 
       
    98 	 * @param packet the packet to be parsed
       
    99 	 * @return -1 if there was a problem, 0 if successfully parsed
       
   100 	 */
       
   101 	private int parsePacket(DatagramPacket packet) {
       
   102 
       
   103 		if(packet.getLength() % 4 != 0) {
       
   104 			if(RTPSession.rtcpDebugLevel > 2) {
       
   105 				System.out.println("RTCPReceiverThread.parsePacket got packet that had length " + packet.getLength());
       
   106 			}
       
   107 			return -1;
       
   108 		} else {
       
   109 			byte[] rawPkt = packet.getData();
       
   110 
       
   111 			// Parse the received compound RTCP (?) packet
       
   112 			CompRtcpPkt compPkt = new CompRtcpPkt(rawPkt, packet.getLength(), 
       
   113 					(InetSocketAddress) packet.getSocketAddress(), rtpSession);
       
   114 
       
   115 			if(this.rtpSession.debugAppIntf != null) {
       
   116 				String intfStr; 
       
   117 
       
   118 				if(rtpSession.mcSession) {
       
   119 					intfStr = this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString();
       
   120 				} else {
       
   121 					intfStr = this.rtpSession.rtpSock.getLocalSocketAddress().toString();
       
   122 				}
       
   123 
       
   124 				if( compPkt.problem == 0) {
       
   125 					String str = new String("Received compound RTCP packet of size " + packet.getLength() + 
       
   126 							" from " + packet.getSocketAddress().toString() + " via " + intfStr
       
   127 							+ " containing " + compPkt.rtcpPkts.size() + " packets" );
       
   128 
       
   129 					this.rtpSession.debugAppIntf.packetReceived(1, 
       
   130 							(InetSocketAddress) packet.getSocketAddress(), str);
       
   131 				} else {
       
   132 					String str = new String("Received invalid RTCP packet of size " + packet.getLength() + 
       
   133 							" from " + packet.getSocketAddress().toString() + " via " +  intfStr
       
   134 							+ ": " + this.debugErrorString(compPkt.problem) );
       
   135 
       
   136 					this.rtpSession.debugAppIntf.packetReceived(-2, 
       
   137 							(InetSocketAddress) packet.getSocketAddress(), str);
       
   138 				}
       
   139 			}
       
   140 			
       
   141 			if(RTPSession.rtcpDebugLevel > 5) {
       
   142 				Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
       
   143 				String str = " ";
       
   144 				while(iter.hasNext()) {
       
   145 					RtcpPkt aPkt = iter.next();
       
   146 					str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
       
   147 				}
       
   148 				System.out.println("<-> RTCPReceiverThread.parsePacket() from " + packet.getSocketAddress().toString() + str);
       
   149 			}
       
   150 			
       
   151 
       
   152 			//Loop over the information
       
   153 			Iterator iter = compPkt.rtcpPkts.iterator();
       
   154 
       
   155 			long curTime = System.currentTimeMillis();
       
   156 
       
   157 			while(iter.hasNext()) {
       
   158 				RtcpPkt aPkt = (RtcpPkt) iter.next();
       
   159 
       
   160 				// Our own packets should already have been filtered out.
       
   161 				if(aPkt.ssrc == rtpSession.ssrc) {
       
   162 					System.out.println("RTCPReceiverThread() received RTCP packet" 
       
   163 							+ " with conflicting SSRC from " + packet.getSocketAddress().toString());
       
   164 					rtpSession.resolveSsrcConflict();
       
   165 					return -1;
       
   166 				}
       
   167 
       
   168 				/**        Receiver Reports        **/
       
   169 				if(	aPkt.getClass() == RtcpPktRR.class) {
       
   170 					RtcpPktRR rrPkt = (RtcpPktRR) aPkt;
       
   171 
       
   172 					Participant p = findParticipant(rrPkt.ssrc, packet);
       
   173 					p.lastRtcpPkt = curTime;
       
   174 
       
   175 					if(rtpSession.rtcpAppIntf != null) {
       
   176 						rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc, rrPkt.reporteeSsrc, 
       
   177 								rrPkt.lossFraction, rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
       
   178 								rrPkt.interArvJitter, rrPkt.timeStampLSR, rrPkt.delaySR);
       
   179 					}
       
   180 
       
   181 					/**        Sender Reports        **/
       
   182 				} else if(aPkt.getClass() == RtcpPktSR.class) {
       
   183 					RtcpPktSR srPkt = (RtcpPktSR) aPkt;
       
   184 
       
   185 					Participant p = findParticipant(srPkt.ssrc, packet);
       
   186 					p.lastRtcpPkt = curTime;
       
   187 
       
   188 					if(p != null) {
       
   189 
       
   190 						if(p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
       
   191 							//Calculate gradient NTP vs RTP
       
   192 							long newTime = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
       
   193 							p.ntpGradient = ((double) (newTime - p.ntpOffset))/((double) srPkt.rtpTs - p.lastSRRtpTs);
       
   194 							if(RTPSession.rtcpDebugLevel > 4) {
       
   195 								System.out.println("RTCPReceiverThread calculated NTP vs RTP gradient: " + Double.toString(p.ntpGradient));
       
   196 							}
       
   197 						} else {
       
   198 							// Calculate sum of ntpTs1 and ntpTs2 in milliseconds
       
   199 							p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
       
   200 							p.lastNtpTs1 = srPkt.ntpTs1;
       
   201 							p.lastNtpTs2 = srPkt.ntpTs2;
       
   202 							p.lastSRRtpTs = srPkt.rtpTs;
       
   203 						}
       
   204 
       
   205 						// For the next RR
       
   206 						p.timeReceivedLSR = curTime;
       
   207 						p.setTimeStampLSR(srPkt.ntpTs1,srPkt.ntpTs2);
       
   208 
       
   209 					}
       
   210 
       
   211 
       
   212 					if(rtpSession.rtcpAppIntf != null) {
       
   213 						if(srPkt.rReports != null) {
       
   214 							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2, 
       
   215 									srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersPktCount,
       
   216 									srPkt.rReports.reporteeSsrc, srPkt.rReports.lossFraction, srPkt.rReports.lostPktCount,
       
   217 									srPkt.rReports.extHighSeqRecv, srPkt.rReports.interArvJitter, srPkt.rReports.timeStampLSR,
       
   218 									srPkt.rReports.delaySR);
       
   219 						} else {
       
   220 							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2, 
       
   221 									srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersPktCount,
       
   222 									null, null, null,
       
   223 									null, null, null,
       
   224 									null);
       
   225 						}
       
   226 					}
       
   227 
       
   228 					/**        Source Descriptions       **/
       
   229 				} else if(aPkt.getClass() == RtcpPktSDES.class) {
       
   230 					RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;				
       
   231 
       
   232 					// The the participant database is updated
       
   233 					// when the SDES packet is reconstructed by CompRtcpPkt	
       
   234 					if(rtpSession.rtcpAppIntf != null) {
       
   235 						rtpSession.rtcpAppIntf.SDESPktReceived(sdesPkt.participants);
       
   236 					}
       
   237 
       
   238 					/**        Bye Packets       **/
       
   239 				} else if(aPkt.getClass() == RtcpPktBYE.class) {
       
   240 					RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;
       
   241 
       
   242 					long time = System.currentTimeMillis();
       
   243 					Participant[] partArray = new Participant[byePkt.ssrcArray.length];
       
   244 
       
   245 					for(int i=0; i<byePkt.ssrcArray.length; i++) {
       
   246 						partArray[i] = rtpSession.partDb.getParticipant(byePkt.ssrcArray[i]);
       
   247 						if(partArray[i] != null)
       
   248 							partArray[i].timestampBYE = time;
       
   249 					}
       
   250 
       
   251 					if(rtpSession.rtcpAppIntf != null) {
       
   252 						rtpSession.rtcpAppIntf.BYEPktReceived(partArray, new String(byePkt.reason));
       
   253 					}
       
   254 					
       
   255 					/**        Application specific Packets       **/
       
   256 				} else if(aPkt.getClass() == RtcpPktAPP.class) {
       
   257 					RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;
       
   258 
       
   259 					Participant part = findParticipant(appPkt.ssrc, packet);
       
   260 					
       
   261 					if(rtpSession.rtcpAppIntf != null) {
       
   262 						rtpSession.rtcpAppIntf.APPPktReceived(part, appPkt.itemCount, appPkt.pktName, appPkt.pktData);
       
   263 					}
       
   264 			}
       
   265 
       
   266 
       
   267 
       
   268 			}
       
   269 		}
       
   270 		return 0;
       
   271 	}
       
   272 	
       
   273 	/**
       
   274 	 * Returns a legible message when an error occurs
       
   275 	 * 
       
   276 	 * @param errorCode the internal error code, commonly negative of packet type
       
   277 	 * @return a string that is hopefully somewhat informative
       
   278 	 */
       
   279 	private String debugErrorString(int errorCode) {
       
   280 		String aStr = "";
       
   281 		switch(errorCode) {
       
   282 			case -1: aStr = "The first packet was not of type SR or RR."; break;
       
   283 			case -2: aStr = "The padding bit was set for the first packet."; break;
       
   284 			case -200: aStr = " Error parsing Sender Report packet."; break;
       
   285 			case -201: aStr = " Error parsing Receiver Report packet."; break;
       
   286 			case -202: aStr = " Error parsing SDES packet"; break;
       
   287 			case -203: aStr = " Error parsing BYE packet."; break;
       
   288 			case -204: aStr = " Error parsing Application specific packet."; break;
       
   289 			case -205: aStr = " Error parsing RTP Feedback packet."; break;
       
   290 			case -206: aStr = " Error parsing Payload-Specific Feedback packet."; break;
       
   291 		default:
       
   292 			aStr = "Unknown error code " + errorCode + ".";
       
   293 		}
       
   294 		
       
   295 		return aStr;
       
   296 	}
       
   297 
       
   298 	/**
       
   299 	 * Start the RTCP receiver thread.
       
   300 	 * 
       
   301 	 * It will
       
   302 	 * 1) run when it receives a packet 
       
   303 	 * 2) parse the packet
       
   304 	 * 3) call any relevant callback functions, update database
       
   305 	 * 4) block until the next one arrives.
       
   306 	 */
       
   307 	public void run() {
       
   308 		if(RTPSession.rtcpDebugLevel > 1) {
       
   309 			if(rtpSession.mcSession) {
       
   310 				System.out.println("-> RTCPReceiverThread.run() starting on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
       
   311 			} else {
       
   312 				System.out.println("-> RTCPReceiverThread.run() starting on " + rtcpSession.rtcpSock.getLocalPort() );
       
   313 			}
       
   314 		}
       
   315 
       
   316 		while(!rtpSession.endSession) {
       
   317 			
       
   318 			if(RTPSession.rtcpDebugLevel > 4) {
       
   319 				if(rtpSession.mcSession) {
       
   320 					System.out.println("-> RTCPReceiverThread.run() waiting for packet on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
       
   321 				} else {
       
   322 					System.out.println("-> RTCPReceiverThread.run() waiting for packet on " + rtcpSession.rtcpSock.getLocalPort() );
       
   323 				}
       
   324 			}
       
   325 
       
   326 			// Prepare a packet
       
   327 			byte[] rawPkt = new byte[1500];
       
   328 			DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);
       
   329 
       
   330 			// Wait for it to arrive
       
   331 			if(! rtpSession.mcSession) {
       
   332 				//Unicast
       
   333 				try {
       
   334 					rtcpSession.rtcpSock.receive(packet);
       
   335 				} catch (IOException e) {
       
   336 					if(!rtpSession.endSession) {
       
   337 						e.printStackTrace();
       
   338 					} else {
       
   339 						continue;
       
   340 					}
       
   341 				}
       
   342 			} else {
       
   343 				//Multicast
       
   344 				try {
       
   345 					rtcpSession.rtcpMCSock.receive(packet);
       
   346 				} catch (IOException e) {
       
   347 					if(!rtpSession.endSession) {
       
   348 						e.printStackTrace();
       
   349 					} else {
       
   350 						continue;
       
   351 					}
       
   352 				}
       
   353 			}
       
   354 			
       
   355 			// Check whether this is one of our own
       
   356 			if( (rtpSession.mcSession && ! packet.getSocketAddress().equals(rtcpSession.rtcpMCSock) )
       
   357 					|| ! packet.getSocketAddress().equals(rtcpSession.rtcpSock) ) {
       
   358 				//System.out.println("Packet received from: " + packet.getSocketAddress().toString());
       
   359 				parsePacket(packet);
       
   360 				//rtpSession.partDb.debugPrint();
       
   361 			}			
       
   362 		}
       
   363 		
       
   364 		if(RTPSession.rtcpDebugLevel > 1) {
       
   365 			System.out.println("<-> RTCPReceiverThread terminating");
       
   366 		}
       
   367 	}
       
   368 
       
   369 }