diff -r f5a5d9237d69 -r e8d6255306f8 src/jlibrtp/RTCPSenderThread.java --- a/src/jlibrtp/RTCPSenderThread.java Sat Jan 23 21:48:58 2010 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,496 +0,0 @@ -/** - * Java RTP Library (jlibrtp) - * Copyright (C) 2006 Arne Kepp - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ -package jlibrtp; - -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.util.Enumeration; -import java.util.Iterator; - -/** - * This thread sends scheduled RTCP packets - * - * It also performs maintenance of various queues and the participant database. - * - * @author Arne Kepp - * - */ -public class RTCPSenderThread extends Thread { - /** Parent RTP Session */ - private RTPSession rtpSession = null; - /** Parent RTCP Session */ - private RTCPSession rtcpSession = null; - - /** Whether we have sent byes for the last conflict */ - private boolean byesSent = false; - - /** - * Constructor for new thread - * - * @param rtcpSession - * parent RTCP session - * @param rtpSession - * parent RTP session - */ - protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) { - this.rtpSession = rtpSession; - this.rtcpSession = rtcpSession; - if (RTPSession.rtpDebugLevel > 1) { - System.out.println("<-> RTCPSenderThread created"); - } - } - - /** - * Send BYE messages to all the relevant participants - * - */ - protected void sendByes() { - // Create the packet - CompRtcpPkt compPkt = new CompRtcpPkt(); - - // Need a SR for validation - RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, - this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, - null); - compPkt.addPacket(srPkt); - - byte[] reasonBytes; - - // Add the actualy BYE Pkt - long[] ssrcArray = { this.rtpSession.ssrc }; - if (rtpSession.conflict) { - reasonBytes = "SSRC collision".getBytes(); - } else { - reasonBytes = "jlibrtp says bye bye!".getBytes(); - } - RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes); - - compPkt.addPacket(byePkt); - - // Send it off - if (rtpSession.mcSession) { - mcSendCompRtcpPkt(compPkt); - } else { - Iterator iter = rtpSession.partDb - .getUnicastReceivers(); - - while (iter.hasNext()) { - Participant part = (Participant) iter.next(); - if (part.rtcpAddress != null) - sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - // System.out.println("SENT BYE PACKETS!!!!!"); - } - } - - /** - * Multicast version of sending a Compound RTCP packet - * - * @param pkt - * the packet to best - * @return 0 is successful, -1 otherwise - */ - protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) { - byte[] pktBytes = pkt.encode(); - DatagramPacket packet; - - // Create datagram - try { - packet = new DatagramPacket(pktBytes, pktBytes.length, - rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort()); - } catch (Exception e) { - System.out - .println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed."); - e.printStackTrace(); - return -1; - } - - // Send packet - if (RTPSession.rtcpDebugLevel > 5) { - System.out - .println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast"); - } - try { - rtcpSession.rtcpMCSock.send(packet); - // Debug - if (this.rtpSession.debugAppIntf != null) { - this.rtpSession.debugAppIntf.packetSent(3, - (InetSocketAddress) packet.getSocketAddress(), - new String("Sent multicast RTCP packet of size " - + packet.getLength() - + " to " - + packet.getSocketAddress().toString() - + " via " - + this.rtcpSession.rtcpMCSock - .getLocalSocketAddress().toString())); - } - } catch (Exception e) { - System.out - .println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed."); - e.printStackTrace(); - return -1; - } - return packet.getLength(); - } - - /** - * Unicast version of sending a Compound RTCP packet - * - * @param pkt - * the packet to best - * @param receiver - * the socket address of the recipient - * @return 0 is successful, -1 otherwise - */ - protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) { - byte[] pktBytes = pkt.encode(); - DatagramPacket packet; - - // Create datagram - try { - // System.out.println("receiver: " + receiver); - packet = new DatagramPacket(pktBytes, pktBytes.length, receiver); - } catch (Exception e) { - System.out - .println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed."); - e.printStackTrace(); - return -1; - } - - // Send packet - if (RTPSession.rtcpDebugLevel > 5) { - Iterator iter = pkt.rtcpPkts.iterator(); - String str = " "; - while (iter.hasNext()) { - RtcpPkt aPkt = iter.next(); - str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", "); - } - System.out - .println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " - + receiver + str); - } - try { - rtcpSession.rtcpSock.send(packet); - // Debug - if (this.rtpSession.debugAppIntf != null) { - this.rtpSession.debugAppIntf.packetSent(2, - (InetSocketAddress) packet.getSocketAddress(), - new String("Sent unicast RTCP packet of size " - + packet.getLength() - + " to " - + packet.getSocketAddress().toString() - + " via " - + this.rtcpSession.rtcpSock - .getLocalSocketAddress().toString())); - } - } catch (Exception e) { - System.out - .println("RTCPSenderThread.SendCompRtcpPkt() unicast failed."); - e.printStackTrace(); - return -1; - } - return packet.getLength(); - } - - /** - * Check whether we can send an immediate feedback packet to this person - * - * @param ssrc - * SSRC of participant - */ - protected void reconsiderTiming(long ssrc) { - Participant part = this.rtpSession.partDb.getParticipant(ssrc); - - if (part != null && this.rtcpSession.fbSendImmediately()) { - CompRtcpPkt compPkt = preparePacket(part, false); - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if (rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - // part.debugPrint(); - datagramLength = this - .sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - /*********** Administrative tasks ***********/ - // Update average packet size - if (datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - } else if (part != null && this.rtcpSession.fbAllowEarly - && this.rtcpSession.fbSendEarly()) { - - // Make sure we dont do it too often - this.rtcpSession.fbAllowEarly = false; - - CompRtcpPkt compPkt = preparePacket(part, true); - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if (rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - // part.debugPrint(); - datagramLength = this - .sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - /*********** Administrative tasks ***********/ - // Update average packet size - if (datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - rtcpSession.calculateDelay(); - } - - // Out of luck, fb message will have to go with next regular packet - // Sleep for the remaining time. - this.rtcpSession.nextDelay -= System.currentTimeMillis() - - this.rtcpSession.prevTime; - if (this.rtcpSession.nextDelay < 0) - this.rtcpSession.nextDelay = 0; - - } - - /** - * Prepare a packet. The output depends on the participant and how the - * packet is scheduled. - * - * @param part - * the participant to report to - * @param regular - * whether this is a regularly, or early scheduled RTCP packet - * @return compound RTCP packet - */ - protected CompRtcpPkt preparePacket(Participant part, boolean regular) { - /*********** Figure out what we are going to send ***********/ - // Check whether this person has sent RTP packets since the last RR. - boolean incRR = false; - if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) { - incRR = true; - part.secondLastRtcpRRPkt = part.lastRtcpRRPkt; - part.lastRtcpRRPkt = System.currentTimeMillis(); - } - - // Are we sending packets? -> add SR - boolean incSR = false; - if (rtpSession.sentPktCount > 0 && regular) { - incSR = true; - } - - /*********** Actually create the packet ***********/ - // Create compound packet - CompRtcpPkt compPkt = new CompRtcpPkt(); - - // If we're sending packets we'll use a SR for header - if (incSR) { - RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, - this.rtpSession.sentPktCount, - this.rtpSession.sentOctetCount, null); - compPkt.addPacket(srPkt); - - if (part.ssrc > 0) { - RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); - if (ar != null) { - for (int i = 0; i < ar.length; i++) { - compPkt.addPacket(ar[i]); - } - } - } - - } - - // If we got anything from this participant since we sent the 2nd to - // last RtcpPkt - if (incRR || !incSR) { - Participant[] partArray = { part }; - - if (part.receivedPkts < 1) - partArray = null; - - RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc); - compPkt.addPacket(rrPkt); - - if (!incSR && part.ssrc > 0) { - RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); - if (ar != null) { - for (int i = 0; i < ar.length; i++) { - compPkt.addPacket(ar[i]); - } - } - } - } - - // APP packets - if (regular && part.ssrc > 0) { - RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc); - if (ar != null) { - for (int i = 0; i < ar.length; i++) { - compPkt.addPacket(ar[i]); - } - } else { - // Nope - } - } - - // For now we'll stick the SDES on every time, and only for us - // if(regular) { - RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null); - compPkt.addPacket(sdesPkt); - // } - - return compPkt; - } - - /** - * Start the RTCP sender thread. - * - * RFC 4585 is more complicated, but in general it will 1) Wait a - * precalculated amount of time 2) Determine the next RTCP recipient 3) - * Construct a compound packet with all the relevant information 4) Send the - * packet 5) Calculate next delay before going to sleep - */ - public void run() { - if (RTPSession.rtcpDebugLevel > 1) { - System.out.println("<-> RTCPSenderThread running"); - } - - // Give the application a chance to register some participants - try { - Thread.sleep(10); - } catch (Exception e) { - System.out.println("RTCPSenderThread didn't get any initial rest."); - } - - // Set up an iterator for the member list - Enumeration enu = null; - Iterator iter = null; - - // TODO Change to rtcpReceivers - if (rtpSession.mcSession) { - enu = rtpSession.partDb.getParticipants(); - } else { - iter = rtpSession.partDb.getUnicastReceivers(); - } - while (!rtpSession.endSession) { - if (RTPSession.rtcpDebugLevel > 5) { - System.out.println("<-> RTCPSenderThread sleeping for " - + rtcpSession.nextDelay + " ms"); - } - - try { - Thread.sleep(rtcpSession.nextDelay); - } catch (Exception e) { - System.out.println("RTCPSenderThread Exception message:" - + e.getMessage()); - // Is the party over? - if (this.rtpSession.endSession) { - continue; - } - - if (rtcpSession.fbWaiting != -1) { - reconsiderTiming(rtcpSession.fbWaiting); - continue; - } - } - - /** Came here the regular way */ - this.rtcpSession.fbAllowEarly = true; - - if (RTPSession.rtcpDebugLevel > 5) { - System.out.println("<-> RTCPSenderThread waking up"); - } - - // Regenerate nextDelay, before anything happens. - rtcpSession.calculateDelay(); - - // We'll wait here until a conflict (if any) has been resolved, - // so that the bye packets for our current SSRC can be sent. - if (rtpSession.conflict) { - if (!this.byesSent) { - sendByes(); - this.byesSent = true; - } - continue; - } - this.byesSent = false; - - // Grab the next person - Participant part = null; - - // Multicast - if (this.rtpSession.mcSession) { - if (!enu.hasMoreElements()) - enu = rtpSession.partDb.getParticipants(); - - if (enu.hasMoreElements()) { - part = enu.nextElement(); - } else { - continue; - } - - // Unicast - } else { - if (!iter.hasNext()) { - iter = rtpSession.partDb.getUnicastReceivers(); - } - - if (iter.hasNext()) { - while (iter.hasNext() - && (part == null || part.rtcpAddress == null)) { - part = iter.next(); - } - } - - if (part == null || part.rtcpAddress == null) - continue; - } - - CompRtcpPkt compPkt = preparePacket(part, true); - - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if (rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - // part.debugPrint(); - datagramLength = this - .sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - - /*********** Administrative tasks ***********/ - // Update average packet size - if (datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - } - - // Be polite, say Bye to everone - sendByes(); - try { - Thread.sleep(200); - } catch (Exception e) { - } - - if (RTPSession.rtcpDebugLevel > 0) { - System.out.println("<-> RTCPSenderThread terminating"); - } - } -}