--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jlibrtp/RTCPSenderThread.java Fri Nov 20 19:29:42 2009 +0100
@@ -0,0 +1,496 @@
+/**
+ * Java RTP Library (jlibrtp)
+ * Copyright (C) 2006 Arne Kepp
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+package jlibrtp;
+
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+/**
+ * This thread sends scheduled RTCP packets
+ *
+ * It also performs maintenance of various queues and the participant database.
+ *
+ * @author Arne Kepp
+ *
+ */
+public class RTCPSenderThread extends Thread {
+ /** Parent RTP Session */
+ private RTPSession rtpSession = null;
+ /** Parent RTCP Session */
+ private RTCPSession rtcpSession = null;
+
+ /** Whether we have sent byes for the last conflict */
+ private boolean byesSent = false;
+
+ /**
+ * Constructor for new thread
+ *
+ * @param rtcpSession
+ * parent RTCP session
+ * @param rtpSession
+ * parent RTP session
+ */
+ protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
+ this.rtpSession = rtpSession;
+ this.rtcpSession = rtcpSession;
+ if (RTPSession.rtpDebugLevel > 1) {
+ System.out.println("<-> RTCPSenderThread created");
+ }
+ }
+
+ /**
+ * Send BYE messages to all the relevant participants
+ *
+ */
+ protected void sendByes() {
+ // Create the packet
+ CompRtcpPkt compPkt = new CompRtcpPkt();
+
+ // Need a SR for validation
+ RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
+ this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount,
+ null);
+ compPkt.addPacket(srPkt);
+
+ byte[] reasonBytes;
+
+ // Add the actualy BYE Pkt
+ long[] ssrcArray = { this.rtpSession.ssrc };
+ if (rtpSession.conflict) {
+ reasonBytes = "SSRC collision".getBytes();
+ } else {
+ reasonBytes = "jlibrtp says bye bye!".getBytes();
+ }
+ RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes);
+
+ compPkt.addPacket(byePkt);
+
+ // Send it off
+ if (rtpSession.mcSession) {
+ mcSendCompRtcpPkt(compPkt);
+ } else {
+ Iterator<Participant> iter = rtpSession.partDb
+ .getUnicastReceivers();
+
+ while (iter.hasNext()) {
+ Participant part = (Participant) iter.next();
+ if (part.rtcpAddress != null)
+ sendCompRtcpPkt(compPkt, part.rtcpAddress);
+ }
+ // System.out.println("SENT BYE PACKETS!!!!!");
+ }
+ }
+
+ /**
+ * Multicast version of sending a Compound RTCP packet
+ *
+ * @param pkt
+ * the packet to best
+ * @return 0 is successful, -1 otherwise
+ */
+ protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
+ byte[] pktBytes = pkt.encode();
+ DatagramPacket packet;
+
+ // Create datagram
+ try {
+ packet = new DatagramPacket(pktBytes, pktBytes.length,
+ rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort());
+ } catch (Exception e) {
+ System.out
+ .println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
+ e.printStackTrace();
+ return -1;
+ }
+
+ // Send packet
+ if (RTPSession.rtcpDebugLevel > 5) {
+ System.out
+ .println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
+ }
+ try {
+ rtcpSession.rtcpMCSock.send(packet);
+ // Debug
+ if (this.rtpSession.debugAppIntf != null) {
+ this.rtpSession.debugAppIntf.packetSent(3,
+ (InetSocketAddress) packet.getSocketAddress(),
+ new String("Sent multicast RTCP packet of size "
+ + packet.getLength()
+ + " to "
+ + packet.getSocketAddress().toString()
+ + " via "
+ + this.rtcpSession.rtcpMCSock
+ .getLocalSocketAddress().toString()));
+ }
+ } catch (Exception e) {
+ System.out
+ .println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
+ e.printStackTrace();
+ return -1;
+ }
+ return packet.getLength();
+ }
+
+ /**
+ * Unicast version of sending a Compound RTCP packet
+ *
+ * @param pkt
+ * the packet to best
+ * @param receiver
+ * the socket address of the recipient
+ * @return 0 is successful, -1 otherwise
+ */
+ protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
+ byte[] pktBytes = pkt.encode();
+ DatagramPacket packet;
+
+ // Create datagram
+ try {
+ // System.out.println("receiver: " + receiver);
+ packet = new DatagramPacket(pktBytes, pktBytes.length, receiver);
+ } catch (Exception e) {
+ System.out
+ .println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
+ e.printStackTrace();
+ return -1;
+ }
+
+ // Send packet
+ if (RTPSession.rtcpDebugLevel > 5) {
+ Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
+ String str = " ";
+ while (iter.hasNext()) {
+ RtcpPkt aPkt = iter.next();
+ str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
+ }
+ System.out
+ .println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to "
+ + receiver + str);
+ }
+ try {
+ rtcpSession.rtcpSock.send(packet);
+ // Debug
+ if (this.rtpSession.debugAppIntf != null) {
+ this.rtpSession.debugAppIntf.packetSent(2,
+ (InetSocketAddress) packet.getSocketAddress(),
+ new String("Sent unicast RTCP packet of size "
+ + packet.getLength()
+ + " to "
+ + packet.getSocketAddress().toString()
+ + " via "
+ + this.rtcpSession.rtcpSock
+ .getLocalSocketAddress().toString()));
+ }
+ } catch (Exception e) {
+ System.out
+ .println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
+ e.printStackTrace();
+ return -1;
+ }
+ return packet.getLength();
+ }
+
+ /**
+ * Check whether we can send an immediate feedback packet to this person
+ *
+ * @param ssrc
+ * SSRC of participant
+ */
+ protected void reconsiderTiming(long ssrc) {
+ Participant part = this.rtpSession.partDb.getParticipant(ssrc);
+
+ if (part != null && this.rtcpSession.fbSendImmediately()) {
+ CompRtcpPkt compPkt = preparePacket(part, false);
+ /*********** Send the packet ***********/
+ // Keep track of sent packet length for average;
+ int datagramLength;
+ if (rtpSession.mcSession) {
+ datagramLength = this.mcSendCompRtcpPkt(compPkt);
+ } else {
+ // part.debugPrint();
+ datagramLength = this
+ .sendCompRtcpPkt(compPkt, part.rtcpAddress);
+ }
+ /*********** Administrative tasks ***********/
+ // Update average packet size
+ if (datagramLength > 0) {
+ rtcpSession.updateAvgPacket(datagramLength);
+ }
+ } else if (part != null && this.rtcpSession.fbAllowEarly
+ && this.rtcpSession.fbSendEarly()) {
+
+ // Make sure we dont do it too often
+ this.rtcpSession.fbAllowEarly = false;
+
+ CompRtcpPkt compPkt = preparePacket(part, true);
+ /*********** Send the packet ***********/
+ // Keep track of sent packet length for average;
+ int datagramLength;
+ if (rtpSession.mcSession) {
+ datagramLength = this.mcSendCompRtcpPkt(compPkt);
+ } else {
+ // part.debugPrint();
+ datagramLength = this
+ .sendCompRtcpPkt(compPkt, part.rtcpAddress);
+ }
+ /*********** Administrative tasks ***********/
+ // Update average packet size
+ if (datagramLength > 0) {
+ rtcpSession.updateAvgPacket(datagramLength);
+ }
+ rtcpSession.calculateDelay();
+ }
+
+ // Out of luck, fb message will have to go with next regular packet
+ // Sleep for the remaining time.
+ this.rtcpSession.nextDelay -= System.currentTimeMillis()
+ - this.rtcpSession.prevTime;
+ if (this.rtcpSession.nextDelay < 0)
+ this.rtcpSession.nextDelay = 0;
+
+ }
+
+ /**
+ * Prepare a packet. The output depends on the participant and how the
+ * packet is scheduled.
+ *
+ * @param part
+ * the participant to report to
+ * @param regular
+ * whether this is a regularly, or early scheduled RTCP packet
+ * @return compound RTCP packet
+ */
+ protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
+ /*********** Figure out what we are going to send ***********/
+ // Check whether this person has sent RTP packets since the last RR.
+ boolean incRR = false;
+ if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
+ incRR = true;
+ part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
+ part.lastRtcpRRPkt = System.currentTimeMillis();
+ }
+
+ // Are we sending packets? -> add SR
+ boolean incSR = false;
+ if (rtpSession.sentPktCount > 0 && regular) {
+ incSR = true;
+ }
+
+ /*********** Actually create the packet ***********/
+ // Create compound packet
+ CompRtcpPkt compPkt = new CompRtcpPkt();
+
+ // If we're sending packets we'll use a SR for header
+ if (incSR) {
+ RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
+ this.rtpSession.sentPktCount,
+ this.rtpSession.sentOctetCount, null);
+ compPkt.addPacket(srPkt);
+
+ if (part.ssrc > 0) {
+ RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
+ if (ar != null) {
+ for (int i = 0; i < ar.length; i++) {
+ compPkt.addPacket(ar[i]);
+ }
+ }
+ }
+
+ }
+
+ // If we got anything from this participant since we sent the 2nd to
+ // last RtcpPkt
+ if (incRR || !incSR) {
+ Participant[] partArray = { part };
+
+ if (part.receivedPkts < 1)
+ partArray = null;
+
+ RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
+ compPkt.addPacket(rrPkt);
+
+ if (!incSR && part.ssrc > 0) {
+ RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
+ if (ar != null) {
+ for (int i = 0; i < ar.length; i++) {
+ compPkt.addPacket(ar[i]);
+ }
+ }
+ }
+ }
+
+ // APP packets
+ if (regular && part.ssrc > 0) {
+ RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
+ if (ar != null) {
+ for (int i = 0; i < ar.length; i++) {
+ compPkt.addPacket(ar[i]);
+ }
+ } else {
+ // Nope
+ }
+ }
+
+ // For now we'll stick the SDES on every time, and only for us
+ // if(regular) {
+ RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
+ compPkt.addPacket(sdesPkt);
+ // }
+
+ return compPkt;
+ }
+
+ /**
+ * Start the RTCP sender thread.
+ *
+ * RFC 4585 is more complicated, but in general it will 1) Wait a
+ * precalculated amount of time 2) Determine the next RTCP recipient 3)
+ * Construct a compound packet with all the relevant information 4) Send the
+ * packet 5) Calculate next delay before going to sleep
+ */
+ public void run() {
+ if (RTPSession.rtcpDebugLevel > 1) {
+ System.out.println("<-> RTCPSenderThread running");
+ }
+
+ // Give the application a chance to register some participants
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ System.out.println("RTCPSenderThread didn't get any initial rest.");
+ }
+
+ // Set up an iterator for the member list
+ Enumeration<Participant> enu = null;
+ Iterator<Participant> iter = null;
+
+ // TODO Change to rtcpReceivers
+ if (rtpSession.mcSession) {
+ enu = rtpSession.partDb.getParticipants();
+ } else {
+ iter = rtpSession.partDb.getUnicastReceivers();
+ }
+ while (!rtpSession.endSession) {
+ if (RTPSession.rtcpDebugLevel > 5) {
+ System.out.println("<-> RTCPSenderThread sleeping for "
+ + rtcpSession.nextDelay + " ms");
+ }
+
+ try {
+ Thread.sleep(rtcpSession.nextDelay);
+ } catch (Exception e) {
+ System.out.println("RTCPSenderThread Exception message:"
+ + e.getMessage());
+ // Is the party over?
+ if (this.rtpSession.endSession) {
+ continue;
+ }
+
+ if (rtcpSession.fbWaiting != -1) {
+ reconsiderTiming(rtcpSession.fbWaiting);
+ continue;
+ }
+ }
+
+ /** Came here the regular way */
+ this.rtcpSession.fbAllowEarly = true;
+
+ if (RTPSession.rtcpDebugLevel > 5) {
+ System.out.println("<-> RTCPSenderThread waking up");
+ }
+
+ // Regenerate nextDelay, before anything happens.
+ rtcpSession.calculateDelay();
+
+ // We'll wait here until a conflict (if any) has been resolved,
+ // so that the bye packets for our current SSRC can be sent.
+ if (rtpSession.conflict) {
+ if (!this.byesSent) {
+ sendByes();
+ this.byesSent = true;
+ }
+ continue;
+ }
+ this.byesSent = false;
+
+ // Grab the next person
+ Participant part = null;
+
+ // Multicast
+ if (this.rtpSession.mcSession) {
+ if (!enu.hasMoreElements())
+ enu = rtpSession.partDb.getParticipants();
+
+ if (enu.hasMoreElements()) {
+ part = enu.nextElement();
+ } else {
+ continue;
+ }
+
+ // Unicast
+ } else {
+ if (!iter.hasNext()) {
+ iter = rtpSession.partDb.getUnicastReceivers();
+ }
+
+ if (iter.hasNext()) {
+ while (iter.hasNext()
+ && (part == null || part.rtcpAddress == null)) {
+ part = iter.next();
+ }
+ }
+
+ if (part == null || part.rtcpAddress == null)
+ continue;
+ }
+
+ CompRtcpPkt compPkt = preparePacket(part, true);
+
+ /*********** Send the packet ***********/
+ // Keep track of sent packet length for average;
+ int datagramLength;
+ if (rtpSession.mcSession) {
+ datagramLength = this.mcSendCompRtcpPkt(compPkt);
+ } else {
+ // part.debugPrint();
+ datagramLength = this
+ .sendCompRtcpPkt(compPkt, part.rtcpAddress);
+ }
+
+ /*********** Administrative tasks ***********/
+ // Update average packet size
+ if (datagramLength > 0) {
+ rtcpSession.updateAvgPacket(datagramLength);
+ }
+ }
+
+ // Be polite, say Bye to everone
+ sendByes();
+ try {
+ Thread.sleep(200);
+ } catch (Exception e) {
+ }
+
+ if (RTPSession.rtcpDebugLevel > 0) {
+ System.out.println("<-> RTCPSenderThread terminating");
+ }
+ }
+}