src/jlibrtp/RTCPSenderThread.java
changeset 834 e8d6255306f8
parent 833 f5a5d9237d69
child 835 4e40f3481f23
--- a/src/jlibrtp/RTCPSenderThread.java	Sat Jan 23 21:48:58 2010 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,496 +0,0 @@
-/**
- * Java RTP Library (jlibrtp)
- * Copyright (C) 2006 Arne Kepp
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
- */
-package jlibrtp;
-
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.util.Enumeration;
-import java.util.Iterator;
-
-/**
- * This thread sends scheduled RTCP packets
- * 
- * It also performs maintenance of various queues and the participant database.
- * 
- * @author Arne Kepp
- * 
- */
-public class RTCPSenderThread extends Thread {
-	/** Parent RTP Session */
-	private RTPSession rtpSession = null;
-	/** Parent RTCP Session */
-	private RTCPSession rtcpSession = null;
-
-	/** Whether we have sent byes for the last conflict */
-	private boolean byesSent = false;
-
-	/**
-	 * Constructor for new thread
-	 * 
-	 * @param rtcpSession
-	 *            parent RTCP session
-	 * @param rtpSession
-	 *            parent RTP session
-	 */
-	protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
-		this.rtpSession = rtpSession;
-		this.rtcpSession = rtcpSession;
-		if (RTPSession.rtpDebugLevel > 1) {
-			System.out.println("<-> RTCPSenderThread created");
-		}
-	}
-
-	/**
-	 * Send BYE messages to all the relevant participants
-	 * 
-	 */
-	protected void sendByes() {
-		// Create the packet
-		CompRtcpPkt compPkt = new CompRtcpPkt();
-
-		// Need a SR for validation
-		RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
-				this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount,
-				null);
-		compPkt.addPacket(srPkt);
-
-		byte[] reasonBytes;
-
-		// Add the actualy BYE Pkt
-		long[] ssrcArray = { this.rtpSession.ssrc };
-		if (rtpSession.conflict) {
-			reasonBytes = "SSRC collision".getBytes();
-		} else {
-			reasonBytes = "jlibrtp says bye bye!".getBytes();
-		}
-		RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes);
-
-		compPkt.addPacket(byePkt);
-
-		// Send it off
-		if (rtpSession.mcSession) {
-			mcSendCompRtcpPkt(compPkt);
-		} else {
-			Iterator<Participant> iter = rtpSession.partDb
-					.getUnicastReceivers();
-
-			while (iter.hasNext()) {
-				Participant part = (Participant) iter.next();
-				if (part.rtcpAddress != null)
-					sendCompRtcpPkt(compPkt, part.rtcpAddress);
-			}
-			// System.out.println("SENT BYE PACKETS!!!!!");
-		}
-	}
-
-	/**
-	 * Multicast version of sending a Compound RTCP packet
-	 * 
-	 * @param pkt
-	 *            the packet to best
-	 * @return 0 is successful, -1 otherwise
-	 */
-	protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
-		byte[] pktBytes = pkt.encode();
-		DatagramPacket packet;
-
-		// Create datagram
-		try {
-			packet = new DatagramPacket(pktBytes, pktBytes.length,
-					rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort());
-		} catch (Exception e) {
-			System.out
-					.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
-			e.printStackTrace();
-			return -1;
-		}
-
-		// Send packet
-		if (RTPSession.rtcpDebugLevel > 5) {
-			System.out
-					.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
-		}
-		try {
-			rtcpSession.rtcpMCSock.send(packet);
-			// Debug
-			if (this.rtpSession.debugAppIntf != null) {
-				this.rtpSession.debugAppIntf.packetSent(3,
-						(InetSocketAddress) packet.getSocketAddress(),
-						new String("Sent multicast RTCP packet of size "
-								+ packet.getLength()
-								+ " to "
-								+ packet.getSocketAddress().toString()
-								+ " via "
-								+ this.rtcpSession.rtcpMCSock
-										.getLocalSocketAddress().toString()));
-			}
-		} catch (Exception e) {
-			System.out
-					.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
-			e.printStackTrace();
-			return -1;
-		}
-		return packet.getLength();
-	}
-
-	/**
-	 * Unicast version of sending a Compound RTCP packet
-	 * 
-	 * @param pkt
-	 *            the packet to best
-	 * @param receiver
-	 *            the socket address of the recipient
-	 * @return 0 is successful, -1 otherwise
-	 */
-	protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
-		byte[] pktBytes = pkt.encode();
-		DatagramPacket packet;
-
-		// Create datagram
-		try {
-			// System.out.println("receiver: " + receiver);
-			packet = new DatagramPacket(pktBytes, pktBytes.length, receiver);
-		} catch (Exception e) {
-			System.out
-					.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
-			e.printStackTrace();
-			return -1;
-		}
-
-		// Send packet
-		if (RTPSession.rtcpDebugLevel > 5) {
-			Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
-			String str = " ";
-			while (iter.hasNext()) {
-				RtcpPkt aPkt = iter.next();
-				str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
-			}
-			System.out
-					.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to "
-							+ receiver + str);
-		}
-		try {
-			rtcpSession.rtcpSock.send(packet);
-			// Debug
-			if (this.rtpSession.debugAppIntf != null) {
-				this.rtpSession.debugAppIntf.packetSent(2,
-						(InetSocketAddress) packet.getSocketAddress(),
-						new String("Sent unicast RTCP packet of size "
-								+ packet.getLength()
-								+ " to "
-								+ packet.getSocketAddress().toString()
-								+ " via "
-								+ this.rtcpSession.rtcpSock
-										.getLocalSocketAddress().toString()));
-			}
-		} catch (Exception e) {
-			System.out
-					.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
-			e.printStackTrace();
-			return -1;
-		}
-		return packet.getLength();
-	}
-
-	/**
-	 * Check whether we can send an immediate feedback packet to this person
-	 * 
-	 * @param ssrc
-	 *            SSRC of participant
-	 */
-	protected void reconsiderTiming(long ssrc) {
-		Participant part = this.rtpSession.partDb.getParticipant(ssrc);
-
-		if (part != null && this.rtcpSession.fbSendImmediately()) {
-			CompRtcpPkt compPkt = preparePacket(part, false);
-			/*********** Send the packet ***********/
-			// Keep track of sent packet length for average;
-			int datagramLength;
-			if (rtpSession.mcSession) {
-				datagramLength = this.mcSendCompRtcpPkt(compPkt);
-			} else {
-				// part.debugPrint();
-				datagramLength = this
-						.sendCompRtcpPkt(compPkt, part.rtcpAddress);
-			}
-			/*********** Administrative tasks ***********/
-			// Update average packet size
-			if (datagramLength > 0) {
-				rtcpSession.updateAvgPacket(datagramLength);
-			}
-		} else if (part != null && this.rtcpSession.fbAllowEarly
-				&& this.rtcpSession.fbSendEarly()) {
-
-			// Make sure we dont do it too often
-			this.rtcpSession.fbAllowEarly = false;
-
-			CompRtcpPkt compPkt = preparePacket(part, true);
-			/*********** Send the packet ***********/
-			// Keep track of sent packet length for average;
-			int datagramLength;
-			if (rtpSession.mcSession) {
-				datagramLength = this.mcSendCompRtcpPkt(compPkt);
-			} else {
-				// part.debugPrint();
-				datagramLength = this
-						.sendCompRtcpPkt(compPkt, part.rtcpAddress);
-			}
-			/*********** Administrative tasks ***********/
-			// Update average packet size
-			if (datagramLength > 0) {
-				rtcpSession.updateAvgPacket(datagramLength);
-			}
-			rtcpSession.calculateDelay();
-		}
-
-		// Out of luck, fb message will have to go with next regular packet
-		// Sleep for the remaining time.
-		this.rtcpSession.nextDelay -= System.currentTimeMillis()
-				- this.rtcpSession.prevTime;
-		if (this.rtcpSession.nextDelay < 0)
-			this.rtcpSession.nextDelay = 0;
-
-	}
-
-	/**
-	 * Prepare a packet. The output depends on the participant and how the
-	 * packet is scheduled.
-	 * 
-	 * @param part
-	 *            the participant to report to
-	 * @param regular
-	 *            whether this is a regularly, or early scheduled RTCP packet
-	 * @return compound RTCP packet
-	 */
-	protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
-		/*********** Figure out what we are going to send ***********/
-		// Check whether this person has sent RTP packets since the last RR.
-		boolean incRR = false;
-		if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
-			incRR = true;
-			part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
-			part.lastRtcpRRPkt = System.currentTimeMillis();
-		}
-
-		// Are we sending packets? -> add SR
-		boolean incSR = false;
-		if (rtpSession.sentPktCount > 0 && regular) {
-			incSR = true;
-		}
-
-		/*********** Actually create the packet ***********/
-		// Create compound packet
-		CompRtcpPkt compPkt = new CompRtcpPkt();
-
-		// If we're sending packets we'll use a SR for header
-		if (incSR) {
-			RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
-					this.rtpSession.sentPktCount,
-					this.rtpSession.sentOctetCount, null);
-			compPkt.addPacket(srPkt);
-
-			if (part.ssrc > 0) {
-				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
-				if (ar != null) {
-					for (int i = 0; i < ar.length; i++) {
-						compPkt.addPacket(ar[i]);
-					}
-				}
-			}
-
-		}
-
-		// If we got anything from this participant since we sent the 2nd to
-		// last RtcpPkt
-		if (incRR || !incSR) {
-			Participant[] partArray = { part };
-
-			if (part.receivedPkts < 1)
-				partArray = null;
-
-			RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
-			compPkt.addPacket(rrPkt);
-
-			if (!incSR && part.ssrc > 0) {
-				RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
-				if (ar != null) {
-					for (int i = 0; i < ar.length; i++) {
-						compPkt.addPacket(ar[i]);
-					}
-				}
-			}
-		}
-
-		// APP packets
-		if (regular && part.ssrc > 0) {
-			RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
-			if (ar != null) {
-				for (int i = 0; i < ar.length; i++) {
-					compPkt.addPacket(ar[i]);
-				}
-			} else {
-				// Nope
-			}
-		}
-
-		// For now we'll stick the SDES on every time, and only for us
-		// if(regular) {
-		RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
-		compPkt.addPacket(sdesPkt);
-		// }
-
-		return compPkt;
-	}
-
-	/**
-	 * Start the RTCP sender thread.
-	 * 
-	 * RFC 4585 is more complicated, but in general it will 1) Wait a
-	 * precalculated amount of time 2) Determine the next RTCP recipient 3)
-	 * Construct a compound packet with all the relevant information 4) Send the
-	 * packet 5) Calculate next delay before going to sleep
-	 */
-	public void run() {
-		if (RTPSession.rtcpDebugLevel > 1) {
-			System.out.println("<-> RTCPSenderThread running");
-		}
-
-		// Give the application a chance to register some participants
-		try {
-			Thread.sleep(10);
-		} catch (Exception e) {
-			System.out.println("RTCPSenderThread didn't get any initial rest.");
-		}
-
-		// Set up an iterator for the member list
-		Enumeration<Participant> enu = null;
-		Iterator<Participant> iter = null;
-
-		// TODO Change to rtcpReceivers
-		if (rtpSession.mcSession) {
-			enu = rtpSession.partDb.getParticipants();
-		} else {
-			iter = rtpSession.partDb.getUnicastReceivers();
-		}
-		while (!rtpSession.endSession) {
-			if (RTPSession.rtcpDebugLevel > 5) {
-				System.out.println("<-> RTCPSenderThread sleeping for "
-						+ rtcpSession.nextDelay + " ms");
-			}
-
-			try {
-				Thread.sleep(rtcpSession.nextDelay);
-			} catch (Exception e) {
-				System.out.println("RTCPSenderThread Exception message:"
-						+ e.getMessage());
-				// Is the party over?
-				if (this.rtpSession.endSession) {
-					continue;
-				}
-
-				if (rtcpSession.fbWaiting != -1) {
-					reconsiderTiming(rtcpSession.fbWaiting);
-					continue;
-				}
-			}
-
-			/** Came here the regular way */
-			this.rtcpSession.fbAllowEarly = true;
-
-			if (RTPSession.rtcpDebugLevel > 5) {
-				System.out.println("<-> RTCPSenderThread waking up");
-			}
-
-			// Regenerate nextDelay, before anything happens.
-			rtcpSession.calculateDelay();
-
-			// We'll wait here until a conflict (if any) has been resolved,
-			// so that the bye packets for our current SSRC can be sent.
-			if (rtpSession.conflict) {
-				if (!this.byesSent) {
-					sendByes();
-					this.byesSent = true;
-				}
-				continue;
-			}
-			this.byesSent = false;
-
-			// Grab the next person
-			Participant part = null;
-
-			// Multicast
-			if (this.rtpSession.mcSession) {
-				if (!enu.hasMoreElements())
-					enu = rtpSession.partDb.getParticipants();
-
-				if (enu.hasMoreElements()) {
-					part = enu.nextElement();
-				} else {
-					continue;
-				}
-
-				// Unicast
-			} else {
-				if (!iter.hasNext()) {
-					iter = rtpSession.partDb.getUnicastReceivers();
-				}
-
-				if (iter.hasNext()) {
-					while (iter.hasNext()
-							&& (part == null || part.rtcpAddress == null)) {
-						part = iter.next();
-					}
-				}
-
-				if (part == null || part.rtcpAddress == null)
-					continue;
-			}
-
-			CompRtcpPkt compPkt = preparePacket(part, true);
-
-			/*********** Send the packet ***********/
-			// Keep track of sent packet length for average;
-			int datagramLength;
-			if (rtpSession.mcSession) {
-				datagramLength = this.mcSendCompRtcpPkt(compPkt);
-			} else {
-				// part.debugPrint();
-				datagramLength = this
-						.sendCompRtcpPkt(compPkt, part.rtcpAddress);
-			}
-
-			/*********** Administrative tasks ***********/
-			// Update average packet size
-			if (datagramLength > 0) {
-				rtcpSession.updateAvgPacket(datagramLength);
-			}
-		}
-
-		// Be polite, say Bye to everone
-		sendByes();
-		try {
-			Thread.sleep(200);
-		} catch (Exception e) {
-		}
-
-		if (RTPSession.rtcpDebugLevel > 0) {
-			System.out.println("<-> RTCPSenderThread terminating");
-		}
-	}
-}