diff -r 9bdff6cbd120 -r 2bf440c54ca5 src/jlibrtp/RTCPSenderThread.java --- a/src/jlibrtp/RTCPSenderThread.java Thu May 28 14:26:06 2009 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,457 +0,0 @@ -/** - * Java RTP Library (jlibrtp) - * Copyright (C) 2006 Arne Kepp - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ -package jlibrtp; - -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.util.*; - -/** - * This thread sends scheduled RTCP packets - * - * It also performs maintenance of various queues and the participant - * database. - * - * @author Arne Kepp - * - */ -public class RTCPSenderThread extends Thread { - /** Parent RTP Session */ - private RTPSession rtpSession = null; - /** Parent RTCP Session */ - private RTCPSession rtcpSession = null; - - /** Whether we have sent byes for the last conflict */ - private boolean byesSent = false; - - /** - * Constructor for new thread - * @param rtcpSession parent RTCP session - * @param rtpSession parent RTP session - */ - protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) { - this.rtpSession = rtpSession; - this.rtcpSession = rtcpSession; - if(RTPSession.rtpDebugLevel > 1) { - System.out.println("<-> RTCPSenderThread created"); - } - } - - /** - * Send BYE messages to all the relevant participants - * - */ - protected void sendByes() { - // Create the packet - CompRtcpPkt compPkt = new CompRtcpPkt(); - - //Need a SR for validation - RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, - this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null); - compPkt.addPacket(srPkt); - - byte[] reasonBytes; - - //Add the actualy BYE Pkt - long[] ssrcArray = {this.rtpSession.ssrc}; - if(rtpSession.conflict) { - reasonBytes = "SSRC collision".getBytes(); - } else { - reasonBytes = "jlibrtp says bye bye!".getBytes(); - } - RtcpPktBYE byePkt = new RtcpPktBYE( ssrcArray, reasonBytes); - - compPkt.addPacket(byePkt); - - // Send it off - if(rtpSession.mcSession) { - mcSendCompRtcpPkt(compPkt); - } else { - Iterator iter = rtpSession.partDb.getUnicastReceivers(); - - while(iter.hasNext()) { - Participant part = (Participant) iter.next(); - if(part.rtcpAddress != null) - sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - //System.out.println("SENT BYE PACKETS!!!!!"); - } - } - - /** - * Multicast version of sending a Compound RTCP packet - * - * @param pkt the packet to best - * @return 0 is successful, -1 otherwise - */ - protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) { - byte[] pktBytes = pkt.encode(); - DatagramPacket packet; - - // Create datagram - try { - packet = new DatagramPacket(pktBytes,pktBytes.length,rtpSession.mcGroup,rtcpSession.rtcpMCSock.getPort()); - } catch (Exception e) { - System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed."); - e.printStackTrace(); - return -1; - } - - // Send packet - if(RTPSession.rtcpDebugLevel > 5) { - System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast"); - } - try { - rtcpSession.rtcpMCSock.send(packet); - //Debug - if(this.rtpSession.debugAppIntf != null) { - this.rtpSession.debugAppIntf.packetSent(3, (InetSocketAddress) packet.getSocketAddress(), - new String("Sent multicast RTCP packet of size " + packet.getLength() + - " to " + packet.getSocketAddress().toString() + " via " - + this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString())); - } - } catch (Exception e) { - System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed."); - e.printStackTrace(); - return -1; - } - return packet.getLength(); - } - - /** - * Unicast version of sending a Compound RTCP packet - * - * @param pkt the packet to best - * @param receiver the socket address of the recipient - * @return 0 is successful, -1 otherwise - */ - protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) { - byte[] pktBytes = pkt.encode(); - DatagramPacket packet; - - //Create datagram - try { - //System.out.println("receiver: " + receiver); - packet = new DatagramPacket(pktBytes,pktBytes.length,receiver); - } catch (Exception e) { - System.out.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed."); - e.printStackTrace(); - return -1; - } - - //Send packet - if(RTPSession.rtcpDebugLevel > 5) { - Iterator iter = pkt.rtcpPkts.iterator(); - String str = " "; - while(iter.hasNext()) { - RtcpPkt aPkt = iter.next(); - str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", "); - } - System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + receiver + str); - } - try { - rtcpSession.rtcpSock.send(packet); - //Debug - if(this.rtpSession.debugAppIntf != null) { - this.rtpSession.debugAppIntf.packetSent(2, (InetSocketAddress) packet.getSocketAddress(), - new String("Sent unicast RTCP packet of size " + packet.getLength() + - " to " + packet.getSocketAddress().toString() + " via " - + this.rtcpSession.rtcpSock.getLocalSocketAddress().toString())); - } - } catch (Exception e) { - System.out.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed."); - e.printStackTrace(); - return -1; - } - return packet.getLength(); - } - - /** - * Check whether we can send an immediate feedback packet to this person - * @param ssrc SSRC of participant - */ - protected void reconsiderTiming(long ssrc) { - Participant part = this.rtpSession.partDb.getParticipant(ssrc); - - if( part != null && this.rtcpSession.fbSendImmediately()) { - CompRtcpPkt compPkt = preparePacket(part, false); - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if(rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - //part.debugPrint(); - datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - /*********** Administrative tasks ***********/ - //Update average packet size - if(datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - } else if(part != null - && this.rtcpSession.fbAllowEarly - && this.rtcpSession.fbSendEarly()) { - - // Make sure we dont do it too often - this.rtcpSession.fbAllowEarly = false; - - CompRtcpPkt compPkt = preparePacket(part, true); - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if(rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - //part.debugPrint(); - datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - /*********** Administrative tasks ***********/ - //Update average packet size - if(datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - rtcpSession.calculateDelay(); - } - - //Out of luck, fb message will have to go with next regular packet - //Sleep for the remaining time. - this.rtcpSession.nextDelay -= System.currentTimeMillis() - this.rtcpSession.prevTime; - if(this.rtcpSession.nextDelay < 0) - this.rtcpSession.nextDelay = 0; - - } - - /** - * Prepare a packet. The output depends on the participant and how the - * packet is scheduled. - * - * @param part the participant to report to - * @param regular whether this is a regularly, or early scheduled RTCP packet - * @return compound RTCP packet - */ - protected CompRtcpPkt preparePacket(Participant part, boolean regular) { - /*********** Figure out what we are going to send ***********/ - // Check whether this person has sent RTP packets since the last RR. - boolean incRR = false; - if(part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) { - incRR = true; - part.secondLastRtcpRRPkt = part.lastRtcpRRPkt; - part.lastRtcpRRPkt = System.currentTimeMillis(); - } - - // Are we sending packets? -> add SR - boolean incSR = false; - if(rtpSession.sentPktCount > 0 && regular) { - incSR = true; - } - - - /*********** Actually create the packet ***********/ - // Create compound packet - CompRtcpPkt compPkt = new CompRtcpPkt(); - - //If we're sending packets we'll use a SR for header - if(incSR) { - RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, - this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null); - compPkt.addPacket(srPkt); - - - if(part.ssrc > 0) { - RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); - if(ar != null) { - for(int i=0; i 0) { - RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); - if(ar != null) { - for(int i=0; i 0) { - RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc); - if(ar != null) { - for(int i=0; i 1) { - System.out.println("<-> RTCPSenderThread running"); - } - - // Give the application a chance to register some participants - try { Thread.sleep(10); } - catch (Exception e) { System.out.println("RTCPSenderThread didn't get any initial rest."); } - - // Set up an iterator for the member list - Enumeration enu = null; - Iterator iter = null; - - // TODO Change to rtcpReceivers - if(rtpSession.mcSession) { - enu = rtpSession.partDb.getParticipants(); - } else { - iter = rtpSession.partDb.getUnicastReceivers(); - } - while(! rtpSession.endSession) { - if(RTPSession.rtcpDebugLevel > 5) { - System.out.println("<-> RTCPSenderThread sleeping for " +rtcpSession.nextDelay+" ms"); - } - - try { Thread.sleep(rtcpSession.nextDelay); } - catch (Exception e) { - System.out.println("RTCPSenderThread Exception message:" + e.getMessage()); - // Is the party over? - if(this.rtpSession.endSession) { - continue; - } - - if(rtcpSession.fbWaiting != -1) { - reconsiderTiming(rtcpSession.fbWaiting); - continue; - } - } - - /** Came here the regular way */ - this.rtcpSession.fbAllowEarly = true; - - - if(RTPSession.rtcpDebugLevel > 5) { - System.out.println("<-> RTCPSenderThread waking up"); - } - - // Regenerate nextDelay, before anything happens. - rtcpSession.calculateDelay(); - - // We'll wait here until a conflict (if any) has been resolved, - // so that the bye packets for our current SSRC can be sent. - if(rtpSession.conflict) { - if(! this.byesSent) { - sendByes(); - this.byesSent = true; - } - continue; - } - this.byesSent = false; - - //Grab the next person - Participant part = null; - - //Multicast - if(this.rtpSession.mcSession) { - if(! enu.hasMoreElements()) - enu = rtpSession.partDb.getParticipants(); - - if( enu.hasMoreElements() ) { - part = enu.nextElement(); - } else { - continue; - } - - //Unicast - } else { - if(! iter.hasNext()) { - iter = rtpSession.partDb.getUnicastReceivers(); - } - - if(iter.hasNext() ) { - while( iter.hasNext() && (part == null || part.rtcpAddress == null)) { - part = iter.next(); - } - } - - if(part == null || part.rtcpAddress == null) - continue; - } - - CompRtcpPkt compPkt = preparePacket(part, true); - - /*********** Send the packet ***********/ - // Keep track of sent packet length for average; - int datagramLength; - if(rtpSession.mcSession) { - datagramLength = this.mcSendCompRtcpPkt(compPkt); - } else { - //part.debugPrint(); - datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); - } - - /*********** Administrative tasks ***********/ - //Update average packet size - if(datagramLength > 0) { - rtcpSession.updateAvgPacket(datagramLength); - } - } - - // Be polite, say Bye to everone - sendByes(); - try { Thread.sleep(200);} catch(Exception e) {} - - if(RTPSession.rtcpDebugLevel > 0) { - System.out.println("<-> RTCPSenderThread terminating"); - } - } -}