src/jlibrtp/RTCPSenderThread.java
author nikita@nikita-rack
Thu, 09 Apr 2009 20:26:58 +0200
changeset 99 8de21ac527ce
parent 13 e684f11070d5
permissions -rw-r--r--
revert pour refaire un push propre

/**
 * Java RTP Library (jlibrtp)
 * Copyright (C) 2006 Arne Kepp
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package jlibrtp;

import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.*;

/**
 * This thread sends scheduled RTCP packets 
 * 
 * It also performs maintenance of various queues and the participant
 * database.
 * 
 * @author Arne Kepp
 *
 */
public class RTCPSenderThread extends Thread {
	/** Parent RTP Session */
	private RTPSession rtpSession = null;
	/** Parent RTCP Session */
	private RTCPSession rtcpSession = null;
	
	/** Whether we have sent byes for the last conflict */
	private boolean byesSent = false;
	
	/**
	 * Constructor for new thread
	 * @param rtcpSession parent RTCP session
	 * @param rtpSession parent RTP session
	 */
	protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
		this.rtpSession = rtpSession;
		this.rtcpSession = rtcpSession;
		if(RTPSession.rtpDebugLevel > 1) {
			System.out.println("<-> RTCPSenderThread created");
		} 
	}
	
	/**
	 * Send BYE messages to all the relevant participants
	 *
	 */
	protected void sendByes() {
		// Create the packet
		CompRtcpPkt compPkt = new CompRtcpPkt();
		
		//Need a SR for validation
		RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, 
				this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
		compPkt.addPacket(srPkt);
		
		byte[] reasonBytes;
		
		//Add the actualy BYE Pkt
		long[] ssrcArray = {this.rtpSession.ssrc};
		if(rtpSession.conflict) {
			reasonBytes = "SSRC collision".getBytes();
		} else {
			reasonBytes = "jlibrtp says bye bye!".getBytes();
		}
		RtcpPktBYE byePkt = new RtcpPktBYE( ssrcArray, reasonBytes);
		
		compPkt.addPacket(byePkt);
		
		// Send it off
		if(rtpSession.mcSession) {
			mcSendCompRtcpPkt(compPkt);
		} else {
			Iterator<Participant> iter = rtpSession.partDb.getUnicastReceivers();
		
			while(iter.hasNext()) {
				Participant part = (Participant) iter.next();
				if(part.rtcpAddress != null)
					sendCompRtcpPkt(compPkt, part.rtcpAddress);
			}
			//System.out.println("SENT BYE PACKETS!!!!!");
		}
	}
	
	/**
	 * Multicast version of sending a Compound RTCP packet
	 * 
	 * @param pkt the packet to best
	 * @return 0 is successful, -1 otherwise
	 */
	protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
		byte[] pktBytes = pkt.encode();
		DatagramPacket packet;
		
		// Create datagram
		try {
			packet = new DatagramPacket(pktBytes,pktBytes.length,rtpSession.mcGroup,rtcpSession.rtcpMCSock.getPort());
		} catch (Exception e) {
			System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
			e.printStackTrace();
			return -1;
		}
		
		// Send packet
		if(RTPSession.rtcpDebugLevel > 5) {
			System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
		}
		try {
			rtcpSession.rtcpMCSock.send(packet);
			//Debug
			if(this.rtpSession.debugAppIntf != null) {
				this.rtpSession.debugAppIntf.packetSent(3, (InetSocketAddress) packet.getSocketAddress(), 
						new String("Sent multicast RTCP packet of size " + packet.getLength() + 
								" to " + packet.getSocketAddress().toString() + " via " 
								+ this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString()));
			}
		} catch (Exception e) {
			System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
			e.printStackTrace();
			return -1;
		}
		return packet.getLength();
	}
	
	/**
	 * Unicast version of sending a Compound RTCP packet
	 * 
	 * @param pkt the packet to best
	 * @param receiver the socket address of the recipient
	 * @return 0 is successful, -1 otherwise
	 */
	protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
		byte[] pktBytes = pkt.encode();
		DatagramPacket packet;
		
		//Create datagram
		try {
			//System.out.println("receiver: " + receiver);
			packet = new DatagramPacket(pktBytes,pktBytes.length,receiver);
		} catch (Exception e) {
			System.out.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
			e.printStackTrace();
			return -1;
		}
		
		//Send packet
		if(RTPSession.rtcpDebugLevel > 5) {
			Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
			String str = " ";
			while(iter.hasNext()) {
				RtcpPkt aPkt = iter.next();
				str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
			}
			System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + receiver + str);
		}
		try {
			rtcpSession.rtcpSock.send(packet);
			//Debug
			if(this.rtpSession.debugAppIntf != null) {
				this.rtpSession.debugAppIntf.packetSent(2, (InetSocketAddress) packet.getSocketAddress(), 
						new String("Sent unicast RTCP packet of size " + packet.getLength() + 
								" to " + packet.getSocketAddress().toString() + " via " 
								+ this.rtcpSession.rtcpSock.getLocalSocketAddress().toString()));
			}
		} catch (Exception e) {
			System.out.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
			e.printStackTrace();
			return -1;
		}
		return packet.getLength();
	}
	
	/**
	 * Check whether we can send an immediate feedback packet to this person
	 * @param ssrc SSRC of participant
	 */
	protected void reconsiderTiming(long ssrc) {
		Participant part =  this.rtpSession.partDb.getParticipant(ssrc);
		
		if( part != null && this.rtcpSession.fbSendImmediately()) {
			CompRtcpPkt compPkt = preparePacket(part, false);
			/*********** Send the packet ***********/
			// Keep track of sent packet length for average;
			int datagramLength;
			if(rtpSession.mcSession) {
				datagramLength = this.mcSendCompRtcpPkt(compPkt);
			} else {
				//part.debugPrint();
				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
			}
			/*********** Administrative tasks ***********/			
			//Update average packet size
			if(datagramLength > 0) {
				rtcpSession.updateAvgPacket(datagramLength);
			}
		} else if(part != null 
				&& this.rtcpSession.fbAllowEarly 
				&& this.rtcpSession.fbSendEarly()) {
			
			// Make sure we dont do it too often
			this.rtcpSession.fbAllowEarly = false;
			
			CompRtcpPkt compPkt = preparePacket(part, true);
			/*********** Send the packet ***********/
			// Keep track of sent packet length for average;
			int datagramLength;
			if(rtpSession.mcSession) {
				datagramLength = this.mcSendCompRtcpPkt(compPkt);
			} else {
				//part.debugPrint();
				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
			}
			/*********** Administrative tasks ***********/			
			//Update average packet size
			if(datagramLength > 0) {
				rtcpSession.updateAvgPacket(datagramLength);
			}
			rtcpSession.calculateDelay();
		}
		
		//Out of luck, fb message will have to go with next regular packet
		//Sleep for the remaining time.
		this.rtcpSession.nextDelay -= System.currentTimeMillis() - this.rtcpSession.prevTime;
		if(this.rtcpSession.nextDelay < 0)
			this.rtcpSession.nextDelay = 0;
		
	}
	
	/** 
	 * Prepare a packet. The output depends on the participant and how the
	 * packet is scheduled.
	 * 
	 * @param part the participant to report to
	 * @param regular whether this is a regularly, or early scheduled RTCP packet
	 * @return compound RTCP packet
	 */
	protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
		/*********** Figure out what we are going to send ***********/
		// Check whether this person has sent RTP packets since the last RR.
		boolean incRR = false;
		if(part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
			incRR = true;
			part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
			part.lastRtcpRRPkt = System.currentTimeMillis();
		}
		
		// Are we sending packets? -> add SR
		boolean incSR = false;
		if(rtpSession.sentPktCount > 0 && regular) {
			incSR = true;
		}
		
		
		/*********** Actually create the packet ***********/
		// Create compound packet
		CompRtcpPkt compPkt = new CompRtcpPkt();
		
		//If we're sending packets we'll use a SR for header
		if(incSR) {
			RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, 
					this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
			compPkt.addPacket(srPkt);
			
			
			if(part.ssrc > 0) {
				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
				if(ar != null) {
					for(int i=0; i<ar.length; i++) {
						compPkt.addPacket(ar[i]);
					}
				}
			}
			
		}
		
		//If we got anything from this participant since we sent the 2nd to last RtcpPkt
		if(incRR || !incSR) {
			Participant[] partArray = {part};
			
			if(part.receivedPkts < 1)
				partArray = null;
			
			RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
			compPkt.addPacket(rrPkt);
			
			if( !incSR && part.ssrc > 0) {
				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
				if(ar != null) {
					for(int i=0; i<ar.length; i++) {
						compPkt.addPacket(ar[i]);
					}
				}
			}
		}
		
		// APP packets
		if(regular && part.ssrc > 0) {
			RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
			if(ar != null) {
				for(int i=0; i<ar.length; i++) {
					compPkt.addPacket(ar[i]);
				}
			} else {
				//Nope
			}
		}
		
		
		// For now we'll stick the SDES on every time, and only for us
		//if(regular) {
			RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
			compPkt.addPacket(sdesPkt);
		//}
		
		return compPkt;
	}
	
	/**
	 * Start the RTCP sender thread.
	 * 
	 * RFC 4585 is more complicated, but in general it will
	 * 1) Wait a precalculated amount of time
	 * 2) Determine the next RTCP recipient
	 * 3) Construct a compound packet with all the relevant information
	 * 4) Send the packet
	 * 5) Calculate next delay before going to sleep
	 */
	public void run() {
		if(RTPSession.rtcpDebugLevel > 1) {
			System.out.println("<-> RTCPSenderThread running");
		}
		
		// Give the application a chance to register some participants
		try { Thread.sleep(10); } 
		catch (Exception e) { System.out.println("RTCPSenderThread didn't get any initial rest."); }
		
		// Set up an iterator for the member list
		Enumeration<Participant> enu = null;
		Iterator<Participant> iter = null;
		
		// TODO Change to rtcpReceivers
		if(rtpSession.mcSession) {
			enu = rtpSession.partDb.getParticipants();
		} else {
			iter = rtpSession.partDb.getUnicastReceivers();
		}
		while(! rtpSession.endSession) {
			if(RTPSession.rtcpDebugLevel > 5) {
				System.out.println("<-> RTCPSenderThread sleeping for " +rtcpSession.nextDelay+" ms");
			}
			
			try { Thread.sleep(rtcpSession.nextDelay); } 
			catch (Exception e) { 
				System.out.println("RTCPSenderThread Exception message:" + e.getMessage());
				// Is the party over?
				if(this.rtpSession.endSession) {
					continue;
				}
				
				if(rtcpSession.fbWaiting != -1) {
					reconsiderTiming(rtcpSession.fbWaiting);
					continue;
				}
			}
			
			/** Came here the regular way */
			this.rtcpSession.fbAllowEarly = true;
			
				
			if(RTPSession.rtcpDebugLevel > 5) {
				System.out.println("<-> RTCPSenderThread waking up");
			}
			
			// Regenerate nextDelay, before anything happens.
			rtcpSession.calculateDelay();
			
			// We'll wait here until a conflict (if any) has been resolved,
			// so that the bye packets for our current SSRC can be sent.
			if(rtpSession.conflict) {
				if(! this.byesSent) {
					sendByes();
					this.byesSent = true;
				}
				continue;
			}
			this.byesSent = false;
						
			//Grab the next person
			Participant part = null;

			//Multicast
			if(this.rtpSession.mcSession) {
				if(! enu.hasMoreElements())
					enu = rtpSession.partDb.getParticipants();
				
				if( enu.hasMoreElements() ) {
					part = enu.nextElement();
				} else {
					continue;
				}
				
			//Unicast
			} else {
				if(! iter.hasNext()) {
					iter = rtpSession.partDb.getUnicastReceivers();
				}
				
				if(iter.hasNext() ) {
					while( iter.hasNext() && (part == null || part.rtcpAddress == null)) {
						part = iter.next();
					}
				}
				
				if(part == null || part.rtcpAddress == null)
					continue;
			}
			
			CompRtcpPkt compPkt = preparePacket(part, true);
			
			/*********** Send the packet ***********/
			// Keep track of sent packet length for average;
			int datagramLength;
			if(rtpSession.mcSession) {
				datagramLength = this.mcSendCompRtcpPkt(compPkt);
			} else {
				//part.debugPrint();
				datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
			}
			
			/*********** Administrative tasks ***********/			
			//Update average packet size
			if(datagramLength > 0) {
				rtcpSession.updateAvgPacket(datagramLength);
			}
		}

		// Be polite, say Bye to everone
		sendByes();
		try { Thread.sleep(200);} catch(Exception e) {}
		
		if(RTPSession.rtcpDebugLevel > 0) {
			System.out.println("<-> RTCPSenderThread terminating");
		}
	}
}