src/jlibrtp/RTCPSenderThread.java
changeset 214 2bf440c54ca5
parent 213 9bdff6cbd120
child 215 5db64229be69
equal deleted inserted replaced
213:9bdff6cbd120 214:2bf440c54ca5
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.net.DatagramPacket;
       
    22 import java.net.InetSocketAddress;
       
    23 import java.util.*;
       
    24 
       
    25 /**
       
    26  * This thread sends scheduled RTCP packets 
       
    27  * 
       
    28  * It also performs maintenance of various queues and the participant
       
    29  * database.
       
    30  * 
       
    31  * @author Arne Kepp
       
    32  *
       
    33  */
       
    34 public class RTCPSenderThread extends Thread {
       
    35 	/** Parent RTP Session */
       
    36 	private RTPSession rtpSession = null;
       
    37 	/** Parent RTCP Session */
       
    38 	private RTCPSession rtcpSession = null;
       
    39 	
       
    40 	/** Whether we have sent byes for the last conflict */
       
    41 	private boolean byesSent = false;
       
    42 	
       
    43 	/**
       
    44 	 * Constructor for new thread
       
    45 	 * @param rtcpSession parent RTCP session
       
    46 	 * @param rtpSession parent RTP session
       
    47 	 */
       
    48 	protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
       
    49 		this.rtpSession = rtpSession;
       
    50 		this.rtcpSession = rtcpSession;
       
    51 		if(RTPSession.rtpDebugLevel > 1) {
       
    52 			System.out.println("<-> RTCPSenderThread created");
       
    53 		} 
       
    54 	}
       
    55 	
       
    56 	/**
       
    57 	 * Send BYE messages to all the relevant participants
       
    58 	 *
       
    59 	 */
       
    60 	protected void sendByes() {
       
    61 		// Create the packet
       
    62 		CompRtcpPkt compPkt = new CompRtcpPkt();
       
    63 		
       
    64 		//Need a SR for validation
       
    65 		RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, 
       
    66 				this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
       
    67 		compPkt.addPacket(srPkt);
       
    68 		
       
    69 		byte[] reasonBytes;
       
    70 		
       
    71 		//Add the actualy BYE Pkt
       
    72 		long[] ssrcArray = {this.rtpSession.ssrc};
       
    73 		if(rtpSession.conflict) {
       
    74 			reasonBytes = "SSRC collision".getBytes();
       
    75 		} else {
       
    76 			reasonBytes = "jlibrtp says bye bye!".getBytes();
       
    77 		}
       
    78 		RtcpPktBYE byePkt = new RtcpPktBYE( ssrcArray, reasonBytes);
       
    79 		
       
    80 		compPkt.addPacket(byePkt);
       
    81 		
       
    82 		// Send it off
       
    83 		if(rtpSession.mcSession) {
       
    84 			mcSendCompRtcpPkt(compPkt);
       
    85 		} else {
       
    86 			Iterator<Participant> iter = rtpSession.partDb.getUnicastReceivers();
       
    87 		
       
    88 			while(iter.hasNext()) {
       
    89 				Participant part = (Participant) iter.next();
       
    90 				if(part.rtcpAddress != null)
       
    91 					sendCompRtcpPkt(compPkt, part.rtcpAddress);
       
    92 			}
       
    93 			//System.out.println("SENT BYE PACKETS!!!!!");
       
    94 		}
       
    95 	}
       
    96 	
       
    97 	/**
       
    98 	 * Multicast version of sending a Compound RTCP packet
       
    99 	 * 
       
   100 	 * @param pkt the packet to best
       
   101 	 * @return 0 is successful, -1 otherwise
       
   102 	 */
       
   103 	protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
       
   104 		byte[] pktBytes = pkt.encode();
       
   105 		DatagramPacket packet;
       
   106 		
       
   107 		// Create datagram
       
   108 		try {
       
   109 			packet = new DatagramPacket(pktBytes,pktBytes.length,rtpSession.mcGroup,rtcpSession.rtcpMCSock.getPort());
       
   110 		} catch (Exception e) {
       
   111 			System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
       
   112 			e.printStackTrace();
       
   113 			return -1;
       
   114 		}
       
   115 		
       
   116 		// Send packet
       
   117 		if(RTPSession.rtcpDebugLevel > 5) {
       
   118 			System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
       
   119 		}
       
   120 		try {
       
   121 			rtcpSession.rtcpMCSock.send(packet);
       
   122 			//Debug
       
   123 			if(this.rtpSession.debugAppIntf != null) {
       
   124 				this.rtpSession.debugAppIntf.packetSent(3, (InetSocketAddress) packet.getSocketAddress(), 
       
   125 						new String("Sent multicast RTCP packet of size " + packet.getLength() + 
       
   126 								" to " + packet.getSocketAddress().toString() + " via " 
       
   127 								+ this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString()));
       
   128 			}
       
   129 		} catch (Exception e) {
       
   130 			System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
       
   131 			e.printStackTrace();
       
   132 			return -1;
       
   133 		}
       
   134 		return packet.getLength();
       
   135 	}
       
   136 	
       
   137 	/**
       
   138 	 * Unicast version of sending a Compound RTCP packet
       
   139 	 * 
       
   140 	 * @param pkt the packet to best
       
   141 	 * @param receiver the socket address of the recipient
       
   142 	 * @return 0 is successful, -1 otherwise
       
   143 	 */
       
   144 	protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
       
   145 		byte[] pktBytes = pkt.encode();
       
   146 		DatagramPacket packet;
       
   147 		
       
   148 		//Create datagram
       
   149 		try {
       
   150 			//System.out.println("receiver: " + receiver);
       
   151 			packet = new DatagramPacket(pktBytes,pktBytes.length,receiver);
       
   152 		} catch (Exception e) {
       
   153 			System.out.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
       
   154 			e.printStackTrace();
       
   155 			return -1;
       
   156 		}
       
   157 		
       
   158 		//Send packet
       
   159 		if(RTPSession.rtcpDebugLevel > 5) {
       
   160 			Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
       
   161 			String str = " ";
       
   162 			while(iter.hasNext()) {
       
   163 				RtcpPkt aPkt = iter.next();
       
   164 				str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
       
   165 			}
       
   166 			System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + receiver + str);
       
   167 		}
       
   168 		try {
       
   169 			rtcpSession.rtcpSock.send(packet);
       
   170 			//Debug
       
   171 			if(this.rtpSession.debugAppIntf != null) {
       
   172 				this.rtpSession.debugAppIntf.packetSent(2, (InetSocketAddress) packet.getSocketAddress(), 
       
   173 						new String("Sent unicast RTCP packet of size " + packet.getLength() + 
       
   174 								" to " + packet.getSocketAddress().toString() + " via " 
       
   175 								+ this.rtcpSession.rtcpSock.getLocalSocketAddress().toString()));
       
   176 			}
       
   177 		} catch (Exception e) {
       
   178 			System.out.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
       
   179 			e.printStackTrace();
       
   180 			return -1;
       
   181 		}
       
   182 		return packet.getLength();
       
   183 	}
       
   184 	
       
   185 	/**
       
   186 	 * Check whether we can send an immediate feedback packet to this person
       
   187 	 * @param ssrc SSRC of participant
       
   188 	 */
       
   189 	protected void reconsiderTiming(long ssrc) {
       
   190 		Participant part =  this.rtpSession.partDb.getParticipant(ssrc);
       
   191 		
       
   192 		if( part != null && this.rtcpSession.fbSendImmediately()) {
       
   193 			CompRtcpPkt compPkt = preparePacket(part, false);
       
   194 			/*********** Send the packet ***********/
       
   195 			// Keep track of sent packet length for average;
       
   196 			int datagramLength;
       
   197 			if(rtpSession.mcSession) {
       
   198 				datagramLength = this.mcSendCompRtcpPkt(compPkt);
       
   199 			} else {
       
   200 				//part.debugPrint();
       
   201 				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
       
   202 			}
       
   203 			/*********** Administrative tasks ***********/			
       
   204 			//Update average packet size
       
   205 			if(datagramLength > 0) {
       
   206 				rtcpSession.updateAvgPacket(datagramLength);
       
   207 			}
       
   208 		} else if(part != null 
       
   209 				&& this.rtcpSession.fbAllowEarly 
       
   210 				&& this.rtcpSession.fbSendEarly()) {
       
   211 			
       
   212 			// Make sure we dont do it too often
       
   213 			this.rtcpSession.fbAllowEarly = false;
       
   214 			
       
   215 			CompRtcpPkt compPkt = preparePacket(part, true);
       
   216 			/*********** Send the packet ***********/
       
   217 			// Keep track of sent packet length for average;
       
   218 			int datagramLength;
       
   219 			if(rtpSession.mcSession) {
       
   220 				datagramLength = this.mcSendCompRtcpPkt(compPkt);
       
   221 			} else {
       
   222 				//part.debugPrint();
       
   223 				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
       
   224 			}
       
   225 			/*********** Administrative tasks ***********/			
       
   226 			//Update average packet size
       
   227 			if(datagramLength > 0) {
       
   228 				rtcpSession.updateAvgPacket(datagramLength);
       
   229 			}
       
   230 			rtcpSession.calculateDelay();
       
   231 		}
       
   232 		
       
   233 		//Out of luck, fb message will have to go with next regular packet
       
   234 		//Sleep for the remaining time.
       
   235 		this.rtcpSession.nextDelay -= System.currentTimeMillis() - this.rtcpSession.prevTime;
       
   236 		if(this.rtcpSession.nextDelay < 0)
       
   237 			this.rtcpSession.nextDelay = 0;
       
   238 		
       
   239 	}
       
   240 	
       
   241 	/** 
       
   242 	 * Prepare a packet. The output depends on the participant and how the
       
   243 	 * packet is scheduled.
       
   244 	 * 
       
   245 	 * @param part the participant to report to
       
   246 	 * @param regular whether this is a regularly, or early scheduled RTCP packet
       
   247 	 * @return compound RTCP packet
       
   248 	 */
       
   249 	protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
       
   250 		/*********** Figure out what we are going to send ***********/
       
   251 		// Check whether this person has sent RTP packets since the last RR.
       
   252 		boolean incRR = false;
       
   253 		if(part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
       
   254 			incRR = true;
       
   255 			part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
       
   256 			part.lastRtcpRRPkt = System.currentTimeMillis();
       
   257 		}
       
   258 		
       
   259 		// Are we sending packets? -> add SR
       
   260 		boolean incSR = false;
       
   261 		if(rtpSession.sentPktCount > 0 && regular) {
       
   262 			incSR = true;
       
   263 		}
       
   264 		
       
   265 		
       
   266 		/*********** Actually create the packet ***********/
       
   267 		// Create compound packet
       
   268 		CompRtcpPkt compPkt = new CompRtcpPkt();
       
   269 		
       
   270 		//If we're sending packets we'll use a SR for header
       
   271 		if(incSR) {
       
   272 			RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, 
       
   273 					this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
       
   274 			compPkt.addPacket(srPkt);
       
   275 			
       
   276 			
       
   277 			if(part.ssrc > 0) {
       
   278 				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
       
   279 				if(ar != null) {
       
   280 					for(int i=0; i<ar.length; i++) {
       
   281 						compPkt.addPacket(ar[i]);
       
   282 					}
       
   283 				}
       
   284 			}
       
   285 			
       
   286 		}
       
   287 		
       
   288 		//If we got anything from this participant since we sent the 2nd to last RtcpPkt
       
   289 		if(incRR || !incSR) {
       
   290 			Participant[] partArray = {part};
       
   291 			
       
   292 			if(part.receivedPkts < 1)
       
   293 				partArray = null;
       
   294 			
       
   295 			RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
       
   296 			compPkt.addPacket(rrPkt);
       
   297 			
       
   298 			if( !incSR && part.ssrc > 0) {
       
   299 				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
       
   300 				if(ar != null) {
       
   301 					for(int i=0; i<ar.length; i++) {
       
   302 						compPkt.addPacket(ar[i]);
       
   303 					}
       
   304 				}
       
   305 			}
       
   306 		}
       
   307 		
       
   308 		// APP packets
       
   309 		if(regular && part.ssrc > 0) {
       
   310 			RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
       
   311 			if(ar != null) {
       
   312 				for(int i=0; i<ar.length; i++) {
       
   313 					compPkt.addPacket(ar[i]);
       
   314 				}
       
   315 			} else {
       
   316 				//Nope
       
   317 			}
       
   318 		}
       
   319 		
       
   320 		
       
   321 		// For now we'll stick the SDES on every time, and only for us
       
   322 		//if(regular) {
       
   323 			RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
       
   324 			compPkt.addPacket(sdesPkt);
       
   325 		//}
       
   326 		
       
   327 		return compPkt;
       
   328 	}
       
   329 	
       
   330 	/**
       
   331 	 * Start the RTCP sender thread.
       
   332 	 * 
       
   333 	 * RFC 4585 is more complicated, but in general it will
       
   334 	 * 1) Wait a precalculated amount of time
       
   335 	 * 2) Determine the next RTCP recipient
       
   336 	 * 3) Construct a compound packet with all the relevant information
       
   337 	 * 4) Send the packet
       
   338 	 * 5) Calculate next delay before going to sleep
       
   339 	 */
       
   340 	public void run() {
       
   341 		if(RTPSession.rtcpDebugLevel > 1) {
       
   342 			System.out.println("<-> RTCPSenderThread running");
       
   343 		}
       
   344 		
       
   345 		// Give the application a chance to register some participants
       
   346 		try { Thread.sleep(10); } 
       
   347 		catch (Exception e) { System.out.println("RTCPSenderThread didn't get any initial rest."); }
       
   348 		
       
   349 		// Set up an iterator for the member list
       
   350 		Enumeration<Participant> enu = null;
       
   351 		Iterator<Participant> iter = null;
       
   352 		
       
   353 		// TODO Change to rtcpReceivers
       
   354 		if(rtpSession.mcSession) {
       
   355 			enu = rtpSession.partDb.getParticipants();
       
   356 		} else {
       
   357 			iter = rtpSession.partDb.getUnicastReceivers();
       
   358 		}
       
   359 		while(! rtpSession.endSession) {
       
   360 			if(RTPSession.rtcpDebugLevel > 5) {
       
   361 				System.out.println("<-> RTCPSenderThread sleeping for " +rtcpSession.nextDelay+" ms");
       
   362 			}
       
   363 			
       
   364 			try { Thread.sleep(rtcpSession.nextDelay); } 
       
   365 			catch (Exception e) { 
       
   366 				System.out.println("RTCPSenderThread Exception message:" + e.getMessage());
       
   367 				// Is the party over?
       
   368 				if(this.rtpSession.endSession) {
       
   369 					continue;
       
   370 				}
       
   371 				
       
   372 				if(rtcpSession.fbWaiting != -1) {
       
   373 					reconsiderTiming(rtcpSession.fbWaiting);
       
   374 					continue;
       
   375 				}
       
   376 			}
       
   377 			
       
   378 			/** Came here the regular way */
       
   379 			this.rtcpSession.fbAllowEarly = true;
       
   380 			
       
   381 				
       
   382 			if(RTPSession.rtcpDebugLevel > 5) {
       
   383 				System.out.println("<-> RTCPSenderThread waking up");
       
   384 			}
       
   385 			
       
   386 			// Regenerate nextDelay, before anything happens.
       
   387 			rtcpSession.calculateDelay();
       
   388 			
       
   389 			// We'll wait here until a conflict (if any) has been resolved,
       
   390 			// so that the bye packets for our current SSRC can be sent.
       
   391 			if(rtpSession.conflict) {
       
   392 				if(! this.byesSent) {
       
   393 					sendByes();
       
   394 					this.byesSent = true;
       
   395 				}
       
   396 				continue;
       
   397 			}
       
   398 			this.byesSent = false;
       
   399 						
       
   400 			//Grab the next person
       
   401 			Participant part = null;
       
   402 
       
   403 			//Multicast
       
   404 			if(this.rtpSession.mcSession) {
       
   405 				if(! enu.hasMoreElements())
       
   406 					enu = rtpSession.partDb.getParticipants();
       
   407 				
       
   408 				if( enu.hasMoreElements() ) {
       
   409 					part = enu.nextElement();
       
   410 				} else {
       
   411 					continue;
       
   412 				}
       
   413 				
       
   414 			//Unicast
       
   415 			} else {
       
   416 				if(! iter.hasNext()) {
       
   417 					iter = rtpSession.partDb.getUnicastReceivers();
       
   418 				}
       
   419 				
       
   420 				if(iter.hasNext() ) {
       
   421 					while( iter.hasNext() && (part == null || part.rtcpAddress == null)) {
       
   422 						part = iter.next();
       
   423 					}
       
   424 				}
       
   425 				
       
   426 				if(part == null || part.rtcpAddress == null)
       
   427 					continue;
       
   428 			}
       
   429 			
       
   430 			CompRtcpPkt compPkt = preparePacket(part, true);
       
   431 			
       
   432 			/*********** Send the packet ***********/
       
   433 			// Keep track of sent packet length for average;
       
   434 			int datagramLength;
       
   435 			if(rtpSession.mcSession) {
       
   436 				datagramLength = this.mcSendCompRtcpPkt(compPkt);
       
   437 			} else {
       
   438 				//part.debugPrint();
       
   439 				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
       
   440 			}
       
   441 			
       
   442 			/*********** Administrative tasks ***********/			
       
   443 			//Update average packet size
       
   444 			if(datagramLength > 0) {
       
   445 				rtcpSession.updateAvgPacket(datagramLength);
       
   446 			}
       
   447 		}
       
   448 
       
   449 		// Be polite, say Bye to everone
       
   450 		sendByes();
       
   451 		try { Thread.sleep(200);} catch(Exception e) {}
       
   452 		
       
   453 		if(RTPSession.rtcpDebugLevel > 0) {
       
   454 			System.out.println("<-> RTCPSenderThread terminating");
       
   455 		}
       
   456 	}
       
   457 }