--- a/src/jlibrtp/RTCPSenderThread.java Sat Jan 23 21:48:58 2010 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,496 +0,0 @@
-/**
- * Java RTP Library (jlibrtp)
- * Copyright (C) 2006 Arne Kepp
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
-package jlibrtp;
-
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.util.Enumeration;
-import java.util.Iterator;
-
-/**
- * This thread sends scheduled RTCP packets
- *
- * It also performs maintenance of various queues and the participant database.
- *
- * @author Arne Kepp
- *
- */
-public class RTCPSenderThread extends Thread {
- /** Parent RTP Session */
- private RTPSession rtpSession = null;
- /** Parent RTCP Session */
- private RTCPSession rtcpSession = null;
-
- /** Whether we have sent byes for the last conflict */
- private boolean byesSent = false;
-
- /**
- * Constructor for new thread
- *
- * @param rtcpSession
- * parent RTCP session
- * @param rtpSession
- * parent RTP session
- */
- protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
- this.rtpSession = rtpSession;
- this.rtcpSession = rtcpSession;
- if (RTPSession.rtpDebugLevel > 1) {
- System.out.println("<-> RTCPSenderThread created");
- }
- }
-
- /**
- * Send BYE messages to all the relevant participants
- *
- */
- protected void sendByes() {
- // Create the packet
- CompRtcpPkt compPkt = new CompRtcpPkt();
-
- // Need a SR for validation
- RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
- this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount,
- null);
- compPkt.addPacket(srPkt);
-
- byte[] reasonBytes;
-
- // Add the actualy BYE Pkt
- long[] ssrcArray = { this.rtpSession.ssrc };
- if (rtpSession.conflict) {
- reasonBytes = "SSRC collision".getBytes();
- } else {
- reasonBytes = "jlibrtp says bye bye!".getBytes();
- }
- RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes);
-
- compPkt.addPacket(byePkt);
-
- // Send it off
- if (rtpSession.mcSession) {
- mcSendCompRtcpPkt(compPkt);
- } else {
- Iterator<Participant> iter = rtpSession.partDb
- .getUnicastReceivers();
-
- while (iter.hasNext()) {
- Participant part = (Participant) iter.next();
- if (part.rtcpAddress != null)
- sendCompRtcpPkt(compPkt, part.rtcpAddress);
- }
- // System.out.println("SENT BYE PACKETS!!!!!");
- }
- }
-
- /**
- * Multicast version of sending a Compound RTCP packet
- *
- * @param pkt
- * the packet to best
- * @return 0 is successful, -1 otherwise
- */
- protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
- byte[] pktBytes = pkt.encode();
- DatagramPacket packet;
-
- // Create datagram
- try {
- packet = new DatagramPacket(pktBytes, pktBytes.length,
- rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort());
- } catch (Exception e) {
- System.out
- .println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
- e.printStackTrace();
- return -1;
- }
-
- // Send packet
- if (RTPSession.rtcpDebugLevel > 5) {
- System.out
- .println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
- }
- try {
- rtcpSession.rtcpMCSock.send(packet);
- // Debug
- if (this.rtpSession.debugAppIntf != null) {
- this.rtpSession.debugAppIntf.packetSent(3,
- (InetSocketAddress) packet.getSocketAddress(),
- new String("Sent multicast RTCP packet of size "
- + packet.getLength()
- + " to "
- + packet.getSocketAddress().toString()
- + " via "
- + this.rtcpSession.rtcpMCSock
- .getLocalSocketAddress().toString()));
- }
- } catch (Exception e) {
- System.out
- .println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
- e.printStackTrace();
- return -1;
- }
- return packet.getLength();
- }
-
- /**
- * Unicast version of sending a Compound RTCP packet
- *
- * @param pkt
- * the packet to best
- * @param receiver
- * the socket address of the recipient
- * @return 0 is successful, -1 otherwise
- */
- protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
- byte[] pktBytes = pkt.encode();
- DatagramPacket packet;
-
- // Create datagram
- try {
- // System.out.println("receiver: " + receiver);
- packet = new DatagramPacket(pktBytes, pktBytes.length, receiver);
- } catch (Exception e) {
- System.out
- .println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
- e.printStackTrace();
- return -1;
- }
-
- // Send packet
- if (RTPSession.rtcpDebugLevel > 5) {
- Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
- String str = " ";
- while (iter.hasNext()) {
- RtcpPkt aPkt = iter.next();
- str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", ");
- }
- System.out
- .println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to "
- + receiver + str);
- }
- try {
- rtcpSession.rtcpSock.send(packet);
- // Debug
- if (this.rtpSession.debugAppIntf != null) {
- this.rtpSession.debugAppIntf.packetSent(2,
- (InetSocketAddress) packet.getSocketAddress(),
- new String("Sent unicast RTCP packet of size "
- + packet.getLength()
- + " to "
- + packet.getSocketAddress().toString()
- + " via "
- + this.rtcpSession.rtcpSock
- .getLocalSocketAddress().toString()));
- }
- } catch (Exception e) {
- System.out
- .println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
- e.printStackTrace();
- return -1;
- }
- return packet.getLength();
- }
-
- /**
- * Check whether we can send an immediate feedback packet to this person
- *
- * @param ssrc
- * SSRC of participant
- */
- protected void reconsiderTiming(long ssrc) {
- Participant part = this.rtpSession.partDb.getParticipant(ssrc);
-
- if (part != null && this.rtcpSession.fbSendImmediately()) {
- CompRtcpPkt compPkt = preparePacket(part, false);
- /*********** Send the packet ***********/
- // Keep track of sent packet length for average;
- int datagramLength;
- if (rtpSession.mcSession) {
- datagramLength = this.mcSendCompRtcpPkt(compPkt);
- } else {
- // part.debugPrint();
- datagramLength = this
- .sendCompRtcpPkt(compPkt, part.rtcpAddress);
- }
- /*********** Administrative tasks ***********/
- // Update average packet size
- if (datagramLength > 0) {
- rtcpSession.updateAvgPacket(datagramLength);
- }
- } else if (part != null && this.rtcpSession.fbAllowEarly
- && this.rtcpSession.fbSendEarly()) {
-
- // Make sure we dont do it too often
- this.rtcpSession.fbAllowEarly = false;
-
- CompRtcpPkt compPkt = preparePacket(part, true);
- /*********** Send the packet ***********/
- // Keep track of sent packet length for average;
- int datagramLength;
- if (rtpSession.mcSession) {
- datagramLength = this.mcSendCompRtcpPkt(compPkt);
- } else {
- // part.debugPrint();
- datagramLength = this
- .sendCompRtcpPkt(compPkt, part.rtcpAddress);
- }
- /*********** Administrative tasks ***********/
- // Update average packet size
- if (datagramLength > 0) {
- rtcpSession.updateAvgPacket(datagramLength);
- }
- rtcpSession.calculateDelay();
- }
-
- // Out of luck, fb message will have to go with next regular packet
- // Sleep for the remaining time.
- this.rtcpSession.nextDelay -= System.currentTimeMillis()
- - this.rtcpSession.prevTime;
- if (this.rtcpSession.nextDelay < 0)
- this.rtcpSession.nextDelay = 0;
-
- }
-
- /**
- * Prepare a packet. The output depends on the participant and how the
- * packet is scheduled.
- *
- * @param part
- * the participant to report to
- * @param regular
- * whether this is a regularly, or early scheduled RTCP packet
- * @return compound RTCP packet
- */
- protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
- /*********** Figure out what we are going to send ***********/
- // Check whether this person has sent RTP packets since the last RR.
- boolean incRR = false;
- if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
- incRR = true;
- part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
- part.lastRtcpRRPkt = System.currentTimeMillis();
- }
-
- // Are we sending packets? -> add SR
- boolean incSR = false;
- if (rtpSession.sentPktCount > 0 && regular) {
- incSR = true;
- }
-
- /*********** Actually create the packet ***********/
- // Create compound packet
- CompRtcpPkt compPkt = new CompRtcpPkt();
-
- // If we're sending packets we'll use a SR for header
- if (incSR) {
- RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
- this.rtpSession.sentPktCount,
- this.rtpSession.sentOctetCount, null);
- compPkt.addPacket(srPkt);
-
- if (part.ssrc > 0) {
- RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
- if (ar != null) {
- for (int i = 0; i < ar.length; i++) {
- compPkt.addPacket(ar[i]);
- }
- }
- }
-
- }
-
- // If we got anything from this participant since we sent the 2nd to
- // last RtcpPkt
- if (incRR || !incSR) {
- Participant[] partArray = { part };
-
- if (part.receivedPkts < 1)
- partArray = null;
-
- RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
- compPkt.addPacket(rrPkt);
-
- if (!incSR && part.ssrc > 0) {
- RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
- if (ar != null) {
- for (int i = 0; i < ar.length; i++) {
- compPkt.addPacket(ar[i]);
- }
- }
- }
- }
-
- // APP packets
- if (regular && part.ssrc > 0) {
- RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
- if (ar != null) {
- for (int i = 0; i < ar.length; i++) {
- compPkt.addPacket(ar[i]);
- }
- } else {
- // Nope
- }
- }
-
- // For now we'll stick the SDES on every time, and only for us
- // if(regular) {
- RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
- compPkt.addPacket(sdesPkt);
- // }
-
- return compPkt;
- }
-
- /**
- * Start the RTCP sender thread.
- *
- * RFC 4585 is more complicated, but in general it will 1) Wait a
- * precalculated amount of time 2) Determine the next RTCP recipient 3)
- * Construct a compound packet with all the relevant information 4) Send the
- * packet 5) Calculate next delay before going to sleep
- */
- public void run() {
- if (RTPSession.rtcpDebugLevel > 1) {
- System.out.println("<-> RTCPSenderThread running");
- }
-
- // Give the application a chance to register some participants
- try {
- Thread.sleep(10);
- } catch (Exception e) {
- System.out.println("RTCPSenderThread didn't get any initial rest.");
- }
-
- // Set up an iterator for the member list
- Enumeration<Participant> enu = null;
- Iterator<Participant> iter = null;
-
- // TODO Change to rtcpReceivers
- if (rtpSession.mcSession) {
- enu = rtpSession.partDb.getParticipants();
- } else {
- iter = rtpSession.partDb.getUnicastReceivers();
- }
- while (!rtpSession.endSession) {
- if (RTPSession.rtcpDebugLevel > 5) {
- System.out.println("<-> RTCPSenderThread sleeping for "
- + rtcpSession.nextDelay + " ms");
- }
-
- try {
- Thread.sleep(rtcpSession.nextDelay);
- } catch (Exception e) {
- System.out.println("RTCPSenderThread Exception message:"
- + e.getMessage());
- // Is the party over?
- if (this.rtpSession.endSession) {
- continue;
- }
-
- if (rtcpSession.fbWaiting != -1) {
- reconsiderTiming(rtcpSession.fbWaiting);
- continue;
- }
- }
-
- /** Came here the regular way */
- this.rtcpSession.fbAllowEarly = true;
-
- if (RTPSession.rtcpDebugLevel > 5) {
- System.out.println("<-> RTCPSenderThread waking up");
- }
-
- // Regenerate nextDelay, before anything happens.
- rtcpSession.calculateDelay();
-
- // We'll wait here until a conflict (if any) has been resolved,
- // so that the bye packets for our current SSRC can be sent.
- if (rtpSession.conflict) {
- if (!this.byesSent) {
- sendByes();
- this.byesSent = true;
- }
- continue;
- }
- this.byesSent = false;
-
- // Grab the next person
- Participant part = null;
-
- // Multicast
- if (this.rtpSession.mcSession) {
- if (!enu.hasMoreElements())
- enu = rtpSession.partDb.getParticipants();
-
- if (enu.hasMoreElements()) {
- part = enu.nextElement();
- } else {
- continue;
- }
-
- // Unicast
- } else {
- if (!iter.hasNext()) {
- iter = rtpSession.partDb.getUnicastReceivers();
- }
-
- if (iter.hasNext()) {
- while (iter.hasNext()
- && (part == null || part.rtcpAddress == null)) {
- part = iter.next();
- }
- }
-
- if (part == null || part.rtcpAddress == null)
- continue;
- }
-
- CompRtcpPkt compPkt = preparePacket(part, true);
-
- /*********** Send the packet ***********/
- // Keep track of sent packet length for average;
- int datagramLength;
- if (rtpSession.mcSession) {
- datagramLength = this.mcSendCompRtcpPkt(compPkt);
- } else {
- // part.debugPrint();
- datagramLength = this
- .sendCompRtcpPkt(compPkt, part.rtcpAddress);
- }
-
- /*********** Administrative tasks ***********/
- // Update average packet size
- if (datagramLength > 0) {
- rtcpSession.updateAvgPacket(datagramLength);
- }
- }
-
- // Be polite, say Bye to everone
- sendByes();
- try {
- Thread.sleep(200);
- } catch (Exception e) {
- }
-
- if (RTPSession.rtcpDebugLevel > 0) {
- System.out.println("<-> RTCPSenderThread terminating");
- }
- }
-}