src/jlibrtp/RTCPReceiverThread.java
changeset 834 e8d6255306f8
parent 833 f5a5d9237d69
child 835 4e40f3481f23
equal deleted inserted replaced
833:f5a5d9237d69 834:e8d6255306f8
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.io.IOException;
       
    22 import java.net.DatagramPacket;
       
    23 import java.net.InetSocketAddress;
       
    24 import java.util.Enumeration;
       
    25 import java.util.Iterator;
       
    26 
       
    27 import org.sipdroid.net.tools.DatagramPool;
       
    28 import org.sipdroid.net.tools.GenericPool;
       
    29 
       
    30 /**
       
    31  * This thread hangs on the RTCP socket and waits for new packets
       
    32  * 
       
    33  * @author Arne Kepp
       
    34  * 
       
    35  */
       
    36 public class RTCPReceiverThread extends Thread {
       
    37 	/** Parent RTP Session */
       
    38 	private RTPSession rtpSession = null;
       
    39 	/** Parent RTCP Session */
       
    40 	private RTCPSession rtcpSession = null;
       
    41 
       
    42 	private GenericPool<CompRtcpPkt> rtcpPacketPool;
       
    43 
       
    44 	/**
       
    45 	 * Constructor for new thread
       
    46 	 * 
       
    47 	 * @param rtcpSession
       
    48 	 *            parent RTCP session
       
    49 	 * @param rtpSession
       
    50 	 *            parent RTP session
       
    51 	 */
       
    52 	RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
       
    53 		this.rtpSession = rtpSession;
       
    54 		this.rtcpSession = rtcpSession;
       
    55 
       
    56 		rtcpPacketPool = new GenericPool<CompRtcpPkt>(10);
       
    57 
       
    58 		if (RTPSession.rtpDebugLevel > 1) {
       
    59 			System.out.println("<-> RTCPReceiverThread created");
       
    60 		}
       
    61 
       
    62 	}
       
    63 
       
    64 	/**
       
    65 	 * Find out whether a participant with this SSRC is known.
       
    66 	 * 
       
    67 	 * If the user is unknown, and the system is operating in unicast mode, try
       
    68 	 * to match the ip-address of the sender to the ip address of a previously
       
    69 	 * unmatched target
       
    70 	 * 
       
    71 	 * @param ssrc
       
    72 	 *            the SSRC of the participant
       
    73 	 * @param packet
       
    74 	 *            the packet that notified us
       
    75 	 * @return the relevant participant, possibly newly created
       
    76 	 */
       
    77 	private Participant findParticipant(long ssrc, DatagramPacket packet) {
       
    78 		Participant p = rtpSession.partDb.getParticipant(ssrc);
       
    79 		if (p == null) {
       
    80 			Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
       
    81 			while (enu.hasMoreElements()) {
       
    82 				Participant tmp = (Participant) enu.nextElement();
       
    83 				if (tmp.ssrc < 0
       
    84 						&& (tmp.rtcpAddress.getAddress().equals(
       
    85 								packet.getAddress()) || tmp.rtpAddress
       
    86 								.getAddress().equals(packet.getAddress()))) {
       
    87 
       
    88 					// Best guess
       
    89 					System.out
       
    90 					.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
       
    91 							+ ssrc
       
    92 							+ " @"
       
    93 							+ packet.getAddress().toString()
       
    94 							+ ", WAS able to match it.");
       
    95 
       
    96 					tmp.ssrc = ssrc;
       
    97 					return tmp;
       
    98 				}
       
    99 			}
       
   100 			// Create an unknown sender
       
   101 			System.out
       
   102 			.println("RTCPReceiverThread: Got an unexpected packet from SSRC:"
       
   103 					+ ssrc
       
   104 					+ " @"
       
   105 					+ packet.getAddress().toString()
       
   106 					+ ", was NOT able to match it.");
       
   107 			p = new Participant((InetSocketAddress) null,
       
   108 					(InetSocketAddress) packet.getSocketAddress(), ssrc);
       
   109 			rtpSession.partDb.addParticipant(2, p);
       
   110 		}
       
   111 		return p;
       
   112 	}
       
   113 
       
   114 	/**
       
   115 	 * Parse a received UDP packet
       
   116 	 * 
       
   117 	 * Perform the header checks and extract the RTCP packets in it
       
   118 	 * 
       
   119 	 * @param packet
       
   120 	 *            the packet to be parsed
       
   121 	 * @return -1 if there was a problem, 0 if successfully parsed
       
   122 	 */
       
   123 	private int parsePacket(DatagramPacket packet) {
       
   124 
       
   125 		if (packet.getLength() % 4 != 0) {
       
   126 			if (RTPSession.rtcpDebugLevel > 2) {
       
   127 				System.out
       
   128 				.println("RTCPReceiverThread.parsePacket got packet that had length "
       
   129 						+ packet.getLength());
       
   130 			}
       
   131 			return -1;
       
   132 		} else {
       
   133 			byte[] rawPkt = packet.getData();
       
   134 
       
   135 			// Parse the received compound RTCP (?) packet
       
   136 			CompRtcpPkt compPkt = rtcpPacketPool.borrowItem();
       
   137 			compPkt.init(rawPkt, packet.getLength(),
       
   138 					(InetSocketAddress) packet.getSocketAddress(), rtpSession);
       
   139 
       
   140 			if (this.rtpSession.debugAppIntf != null) {
       
   141 				String intfStr;
       
   142 
       
   143 				if (rtpSession.mcSession) {
       
   144 					intfStr = this.rtcpSession.rtcpMCSock
       
   145 					.getLocalSocketAddress().toString();
       
   146 				} else {
       
   147 					intfStr = this.rtpSession.rtpSock.getLocalSocketAddress()
       
   148 					.toString();
       
   149 				}
       
   150 
       
   151 				if (compPkt.problem == 0) {
       
   152 					String str = new String(
       
   153 							"Received compound RTCP packet of size "
       
   154 							+ packet.getLength() + " from "
       
   155 							+ packet.getSocketAddress().toString()
       
   156 							+ " via " + intfStr + " containing "
       
   157 							+ compPkt.rtcpPkts.size() + " packets");
       
   158 
       
   159 					this.rtpSession.debugAppIntf.packetReceived(1,
       
   160 							(InetSocketAddress) packet.getSocketAddress(), str);
       
   161 				} else {
       
   162 					String str = new String(
       
   163 							"Received invalid RTCP packet of size "
       
   164 							+ packet.getLength() + " from "
       
   165 							+ packet.getSocketAddress().toString()
       
   166 							+ " via " + intfStr + ": "
       
   167 							+ this.debugErrorString(compPkt.problem));
       
   168 
       
   169 					this.rtpSession.debugAppIntf.packetReceived(-2,
       
   170 							(InetSocketAddress) packet.getSocketAddress(), str);
       
   171 				}
       
   172 			}
       
   173 
       
   174 			if (RTPSession.rtcpDebugLevel > 5) {
       
   175 				Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
       
   176 				String str = " ";
       
   177 				while (iter.hasNext()) {
       
   178 					RtcpPkt aPkt = iter.next();
       
   179 					str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
       
   180 				}
       
   181 				System.out.println("<-> RTCPReceiverThread.parsePacket() from "
       
   182 						+ packet.getSocketAddress().toString() + str);
       
   183 			}
       
   184 
       
   185 			// Loop over the information
       
   186 			Iterator iter = compPkt.rtcpPkts.iterator();
       
   187 
       
   188 			long curTime = System.currentTimeMillis();
       
   189 
       
   190 			while (iter.hasNext()) {
       
   191 				RtcpPkt aPkt = (RtcpPkt) iter.next();
       
   192 
       
   193 				// Our own packets should already have been filtered out.
       
   194 				if (aPkt.ssrc == rtpSession.ssrc) {
       
   195 					System.out
       
   196 					.println("RTCPReceiverThread() received RTCP packet"
       
   197 							+ " with conflicting SSRC from "
       
   198 							+ packet.getSocketAddress().toString());
       
   199 					rtpSession.resolveSsrcConflict();
       
   200 					return -1;
       
   201 				}
       
   202 
       
   203 				/** Receiver Reports **/
       
   204 				if (aPkt.getClass() == RtcpPktRR.class) {
       
   205 					RtcpPktRR rrPkt = (RtcpPktRR) aPkt;
       
   206 
       
   207 					Participant p = findParticipant(rrPkt.ssrc, packet);
       
   208 					p.lastRtcpPkt = curTime;
       
   209 
       
   210 					if (rtpSession.rtcpAppIntf != null) {
       
   211 						rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc,
       
   212 								rrPkt.reporteeSsrc, rrPkt.lossFraction,
       
   213 								rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
       
   214 								rrPkt.interArvJitter, rrPkt.timeStampLSR,
       
   215 								rrPkt.delaySR);
       
   216 					}
       
   217 
       
   218 					/** Sender Reports **/
       
   219 				} else if (aPkt.getClass() == RtcpPktSR.class) {
       
   220 					RtcpPktSR srPkt = (RtcpPktSR) aPkt;
       
   221 
       
   222 					Participant p = findParticipant(srPkt.ssrc, packet);
       
   223 					p.lastRtcpPkt = curTime;
       
   224 
       
   225 					if (p != null) {
       
   226 
       
   227 						if (p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
       
   228 							// Calculate gradient NTP vs RTP
       
   229 							long newTime = StaticProcs.undoNtpMess(
       
   230 									srPkt.ntpTs1, srPkt.ntpTs2);
       
   231 							p.ntpGradient = ((double) (newTime - p.ntpOffset))
       
   232 							/ ((double) srPkt.rtpTs - p.lastSRRtpTs);
       
   233 							if (RTPSession.rtcpDebugLevel > 4) {
       
   234 								System.out
       
   235 								.println("RTCPReceiverThread calculated NTP vs RTP gradient: "
       
   236 										+ Double
       
   237 										.toString(p.ntpGradient));
       
   238 							}
       
   239 						} else {
       
   240 							// Calculate sum of ntpTs1 and ntpTs2 in
       
   241 							// milliseconds
       
   242 							p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1,
       
   243 									srPkt.ntpTs2);
       
   244 							p.lastNtpTs1 = srPkt.ntpTs1;
       
   245 							p.lastNtpTs2 = srPkt.ntpTs2;
       
   246 							p.lastSRRtpTs = srPkt.rtpTs;
       
   247 						}
       
   248 
       
   249 						// For the next RR
       
   250 						p.timeReceivedLSR = curTime;
       
   251 						p.setTimeStampLSR(srPkt.ntpTs1, srPkt.ntpTs2);
       
   252 
       
   253 					}
       
   254 
       
   255 					if (rtpSession.rtcpAppIntf != null) {
       
   256 						if (srPkt.rReports != null) {
       
   257 							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
       
   258 									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
       
   259 									srPkt.sendersPktCount,
       
   260 									srPkt.sendersPktCount,
       
   261 									srPkt.rReports.reporteeSsrc,
       
   262 									srPkt.rReports.lossFraction,
       
   263 									srPkt.rReports.lostPktCount,
       
   264 									srPkt.rReports.extHighSeqRecv,
       
   265 									srPkt.rReports.interArvJitter,
       
   266 									srPkt.rReports.timeStampLSR,
       
   267 									srPkt.rReports.delaySR);
       
   268 						} else {
       
   269 							rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc,
       
   270 									srPkt.ntpTs1, srPkt.ntpTs2, srPkt.rtpTs,
       
   271 									srPkt.sendersPktCount,
       
   272 									srPkt.sendersPktCount, null, null, null,
       
   273 									null, null, null, null);
       
   274 						}
       
   275 					}
       
   276 
       
   277 					/** Source Descriptions **/
       
   278 				} else if (aPkt.getClass() == RtcpPktSDES.class) {
       
   279 					RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;
       
   280 
       
   281 					// The the participant database is updated
       
   282 					// when the SDES packet is reconstructed by CompRtcpPkt
       
   283 					if (rtpSession.rtcpAppIntf != null) {
       
   284 						rtpSession.rtcpAppIntf
       
   285 						.SDESPktReceived(sdesPkt.participants);
       
   286 					}
       
   287 
       
   288 					/** Bye Packets **/
       
   289 				} else if (aPkt.getClass() == RtcpPktBYE.class) {
       
   290 					RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;
       
   291 
       
   292 					long time = System.currentTimeMillis();
       
   293 					Participant[] partArray = new Participant[byePkt.ssrcArray.length];
       
   294 
       
   295 					for (int i = 0; i < byePkt.ssrcArray.length; i++) {
       
   296 						partArray[i] = rtpSession.partDb
       
   297 						.getParticipant(byePkt.ssrcArray[i]);
       
   298 						if (partArray[i] != null)
       
   299 							partArray[i].timestampBYE = time;
       
   300 					}
       
   301 
       
   302 					if (rtpSession.rtcpAppIntf != null) {
       
   303 						rtpSession.rtcpAppIntf.BYEPktReceived(partArray,
       
   304 								new String(byePkt.reason));
       
   305 					}
       
   306 
       
   307 					/** Application specific Packets **/
       
   308 				} else if (aPkt.getClass() == RtcpPktAPP.class) {
       
   309 					RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;
       
   310 
       
   311 					Participant part = findParticipant(appPkt.ssrc, packet);
       
   312 
       
   313 					if (rtpSession.rtcpAppIntf != null) {
       
   314 						rtpSession.rtcpAppIntf.APPPktReceived(part,
       
   315 								appPkt.itemCount, appPkt.pktName,
       
   316 								appPkt.pktData);
       
   317 					}
       
   318 				}
       
   319 
       
   320 			}
       
   321 		}
       
   322 		return 0;
       
   323 	}
       
   324 
       
   325 	/**
       
   326 	 * Returns a legible message when an error occurs
       
   327 	 * 
       
   328 	 * @param errorCode
       
   329 	 *            the internal error code, commonly negative of packet type
       
   330 	 * @return a string that is hopefully somewhat informative
       
   331 	 */
       
   332 	private String debugErrorString(int errorCode) {
       
   333 		String aStr = "";
       
   334 		switch (errorCode) {
       
   335 		case -1:
       
   336 			aStr = "The first packet was not of type SR or RR.";
       
   337 			break;
       
   338 		case -2:
       
   339 			aStr = "The padding bit was set for the first packet.";
       
   340 			break;
       
   341 		case -200:
       
   342 			aStr = " Error parsing Sender Report packet.";
       
   343 			break;
       
   344 		case -201:
       
   345 			aStr = " Error parsing Receiver Report packet.";
       
   346 			break;
       
   347 		case -202:
       
   348 			aStr = " Error parsing SDES packet";
       
   349 			break;
       
   350 		case -203:
       
   351 			aStr = " Error parsing BYE packet.";
       
   352 			break;
       
   353 		case -204:
       
   354 			aStr = " Error parsing Application specific packet.";
       
   355 			break;
       
   356 		case -205:
       
   357 			aStr = " Error parsing RTP Feedback packet.";
       
   358 			break;
       
   359 		case -206:
       
   360 			aStr = " Error parsing Payload-Specific Feedback packet.";
       
   361 			break;
       
   362 		default:
       
   363 			aStr = "Unknown error code " + errorCode + ".";
       
   364 		}
       
   365 
       
   366 		return aStr;
       
   367 	}
       
   368 
       
   369 	/**
       
   370 	 * Start the RTCP receiver thread.
       
   371 	 * 
       
   372 	 * It will 1) run when it receives a packet 2) parse the packet 3) call any
       
   373 	 * relevant callback functions, update database 4) block until the next one
       
   374 	 * arrives.
       
   375 	 */
       
   376 	public void run() {
       
   377 		if (RTPSession.rtcpDebugLevel > 1) {
       
   378 			if (rtpSession.mcSession) {
       
   379 				System.out
       
   380 				.println("-> RTCPReceiverThread.run() starting on MC "
       
   381 						+ rtcpSession.rtcpMCSock.getLocalPort());
       
   382 			} else {
       
   383 				System.out.println("-> RTCPReceiverThread.run() starting on "
       
   384 						+ rtcpSession.rtcpSock.getLocalPort());
       
   385 			}
       
   386 		}
       
   387 
       
   388 		while (!rtpSession.endSession) {
       
   389 
       
   390 			if (RTPSession.rtcpDebugLevel > 4) {
       
   391 				if (rtpSession.mcSession) {
       
   392 					System.out
       
   393 					.println("-> RTCPReceiverThread.run() waiting for packet on MC "
       
   394 							+ rtcpSession.rtcpMCSock.getLocalPort());
       
   395 				} else {
       
   396 					System.out
       
   397 					.println("-> RTCPReceiverThread.run() waiting for packet on "
       
   398 							+ rtcpSession.rtcpSock.getLocalPort());
       
   399 				}
       
   400 			}
       
   401 
       
   402 			// Prepare a packet
       
   403 			DatagramPacket packet = DatagramPool.getInstance().borrowPacket();
       
   404 			
       
   405 			// Wait for it to arrive
       
   406 			if (!rtpSession.mcSession) {
       
   407 				// Unicast
       
   408 				try {
       
   409 					rtcpSession.rtcpSock.receive(packet);
       
   410 				} catch (IOException e) {
       
   411 					if (!rtpSession.endSession) {
       
   412 						e.printStackTrace();
       
   413 					} else {
       
   414 						continue;
       
   415 					}
       
   416 				}
       
   417 			} else {
       
   418 				// Multicast
       
   419 				try {
       
   420 					rtcpSession.rtcpMCSock.receive(packet);
       
   421 				} catch (IOException e) {
       
   422 					if (!rtpSession.endSession) {
       
   423 						e.printStackTrace();
       
   424 					} else {
       
   425 						continue;
       
   426 					}
       
   427 				}
       
   428 			}
       
   429 
       
   430 			// Check whether this is one of our own
       
   431 			if ((rtpSession.mcSession && !packet.getSocketAddress().equals(
       
   432 					rtcpSession.rtcpMCSock))
       
   433 					|| !packet.getSocketAddress().equals(rtcpSession.rtcpSock)) {
       
   434 				// System.out.println("Packet received from: " +
       
   435 				// packet.getSocketAddress().toString());
       
   436 				parsePacket(packet);
       
   437 				// rtpSession.partDb.debugPrint();
       
   438 			}
       
   439 		}
       
   440 
       
   441 		if (RTPSession.rtcpDebugLevel > 1) {
       
   442 			System.out.println("<-> RTCPReceiverThread terminating");
       
   443 		}
       
   444 	}
       
   445 
       
   446 }