diff -r c9ff263c29ad -r e684f11070d5 src/jlibrtp/RTCPSenderThread.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jlibrtp/RTCPSenderThread.java Sat Mar 14 22:15:41 2009 +0100 @@ -0,0 +1,457 @@ +/** + * Java RTP Library (jlibrtp) + * Copyright (C) 2006 Arne Kepp + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ +package jlibrtp; + +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.util.*; + +/** + * This thread sends scheduled RTCP packets + * + * It also performs maintenance of various queues and the participant + * database. + * + * @author Arne Kepp + * + */ +public class RTCPSenderThread extends Thread { + /** Parent RTP Session */ + private RTPSession rtpSession = null; + /** Parent RTCP Session */ + private RTCPSession rtcpSession = null; + + /** Whether we have sent byes for the last conflict */ + private boolean byesSent = false; + + /** + * Constructor for new thread + * @param rtcpSession parent RTCP session + * @param rtpSession parent RTP session + */ + protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) { + this.rtpSession = rtpSession; + this.rtcpSession = rtcpSession; + if(RTPSession.rtpDebugLevel > 1) { + System.out.println("<-> RTCPSenderThread created"); + } + } + + /** + * Send BYE messages to all the relevant participants + * + */ + protected void sendByes() { + // Create the packet + CompRtcpPkt compPkt = new CompRtcpPkt(); + + //Need a SR for validation + RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, + this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null); + compPkt.addPacket(srPkt); + + byte[] reasonBytes; + + //Add the actualy BYE Pkt + long[] ssrcArray = {this.rtpSession.ssrc}; + if(rtpSession.conflict) { + reasonBytes = "SSRC collision".getBytes(); + } else { + reasonBytes = "jlibrtp says bye bye!".getBytes(); + } + RtcpPktBYE byePkt = new RtcpPktBYE( ssrcArray, reasonBytes); + + compPkt.addPacket(byePkt); + + // Send it off + if(rtpSession.mcSession) { + mcSendCompRtcpPkt(compPkt); + } else { + Iterator iter = rtpSession.partDb.getUnicastReceivers(); + + while(iter.hasNext()) { + Participant part = (Participant) iter.next(); + if(part.rtcpAddress != null) + sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + //System.out.println("SENT BYE PACKETS!!!!!"); + } + } + + /** + * Multicast version of sending a Compound RTCP packet + * + * @param pkt the packet to best + * @return 0 is successful, -1 otherwise + */ + protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) { + byte[] pktBytes = pkt.encode(); + DatagramPacket packet; + + // Create datagram + try { + packet = new DatagramPacket(pktBytes,pktBytes.length,rtpSession.mcGroup,rtcpSession.rtcpMCSock.getPort()); + } catch (Exception e) { + System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed."); + e.printStackTrace(); + return -1; + } + + // Send packet + if(RTPSession.rtcpDebugLevel > 5) { + System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast"); + } + try { + rtcpSession.rtcpMCSock.send(packet); + //Debug + if(this.rtpSession.debugAppIntf != null) { + this.rtpSession.debugAppIntf.packetSent(3, (InetSocketAddress) packet.getSocketAddress(), + new String("Sent multicast RTCP packet of size " + packet.getLength() + + " to " + packet.getSocketAddress().toString() + " via " + + this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString())); + } + } catch (Exception e) { + System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed."); + e.printStackTrace(); + return -1; + } + return packet.getLength(); + } + + /** + * Unicast version of sending a Compound RTCP packet + * + * @param pkt the packet to best + * @param receiver the socket address of the recipient + * @return 0 is successful, -1 otherwise + */ + protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) { + byte[] pktBytes = pkt.encode(); + DatagramPacket packet; + + //Create datagram + try { + //System.out.println("receiver: " + receiver); + packet = new DatagramPacket(pktBytes,pktBytes.length,receiver); + } catch (Exception e) { + System.out.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed."); + e.printStackTrace(); + return -1; + } + + //Send packet + if(RTPSession.rtcpDebugLevel > 5) { + Iterator iter = pkt.rtcpPkts.iterator(); + String str = " "; + while(iter.hasNext()) { + RtcpPkt aPkt = iter.next(); + str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", "); + } + System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + receiver + str); + } + try { + rtcpSession.rtcpSock.send(packet); + //Debug + if(this.rtpSession.debugAppIntf != null) { + this.rtpSession.debugAppIntf.packetSent(2, (InetSocketAddress) packet.getSocketAddress(), + new String("Sent unicast RTCP packet of size " + packet.getLength() + + " to " + packet.getSocketAddress().toString() + " via " + + this.rtcpSession.rtcpSock.getLocalSocketAddress().toString())); + } + } catch (Exception e) { + System.out.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed."); + e.printStackTrace(); + return -1; + } + return packet.getLength(); + } + + /** + * Check whether we can send an immediate feedback packet to this person + * @param ssrc SSRC of participant + */ + protected void reconsiderTiming(long ssrc) { + Participant part = this.rtpSession.partDb.getParticipant(ssrc); + + if( part != null && this.rtcpSession.fbSendImmediately()) { + CompRtcpPkt compPkt = preparePacket(part, false); + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if(rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + //part.debugPrint(); + datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + /*********** Administrative tasks ***********/ + //Update average packet size + if(datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + } else if(part != null + && this.rtcpSession.fbAllowEarly + && this.rtcpSession.fbSendEarly()) { + + // Make sure we dont do it too often + this.rtcpSession.fbAllowEarly = false; + + CompRtcpPkt compPkt = preparePacket(part, true); + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if(rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + //part.debugPrint(); + datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + /*********** Administrative tasks ***********/ + //Update average packet size + if(datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + rtcpSession.calculateDelay(); + } + + //Out of luck, fb message will have to go with next regular packet + //Sleep for the remaining time. + this.rtcpSession.nextDelay -= System.currentTimeMillis() - this.rtcpSession.prevTime; + if(this.rtcpSession.nextDelay < 0) + this.rtcpSession.nextDelay = 0; + + } + + /** + * Prepare a packet. The output depends on the participant and how the + * packet is scheduled. + * + * @param part the participant to report to + * @param regular whether this is a regularly, or early scheduled RTCP packet + * @return compound RTCP packet + */ + protected CompRtcpPkt preparePacket(Participant part, boolean regular) { + /*********** Figure out what we are going to send ***********/ + // Check whether this person has sent RTP packets since the last RR. + boolean incRR = false; + if(part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) { + incRR = true; + part.secondLastRtcpRRPkt = part.lastRtcpRRPkt; + part.lastRtcpRRPkt = System.currentTimeMillis(); + } + + // Are we sending packets? -> add SR + boolean incSR = false; + if(rtpSession.sentPktCount > 0 && regular) { + incSR = true; + } + + + /*********** Actually create the packet ***********/ + // Create compound packet + CompRtcpPkt compPkt = new CompRtcpPkt(); + + //If we're sending packets we'll use a SR for header + if(incSR) { + RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, + this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null); + compPkt.addPacket(srPkt); + + + if(part.ssrc > 0) { + RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); + if(ar != null) { + for(int i=0; i 0) { + RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); + if(ar != null) { + for(int i=0; i 0) { + RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc); + if(ar != null) { + for(int i=0; i 1) { + System.out.println("<-> RTCPSenderThread running"); + } + + // Give the application a chance to register some participants + try { Thread.sleep(10); } + catch (Exception e) { System.out.println("RTCPSenderThread didn't get any initial rest."); } + + // Set up an iterator for the member list + Enumeration enu = null; + Iterator iter = null; + + // TODO Change to rtcpReceivers + if(rtpSession.mcSession) { + enu = rtpSession.partDb.getParticipants(); + } else { + iter = rtpSession.partDb.getUnicastReceivers(); + } + while(! rtpSession.endSession) { + if(RTPSession.rtcpDebugLevel > 5) { + System.out.println("<-> RTCPSenderThread sleeping for " +rtcpSession.nextDelay+" ms"); + } + + try { Thread.sleep(rtcpSession.nextDelay); } + catch (Exception e) { + System.out.println("RTCPSenderThread Exception message:" + e.getMessage()); + // Is the party over? + if(this.rtpSession.endSession) { + continue; + } + + if(rtcpSession.fbWaiting != -1) { + reconsiderTiming(rtcpSession.fbWaiting); + continue; + } + } + + /** Came here the regular way */ + this.rtcpSession.fbAllowEarly = true; + + + if(RTPSession.rtcpDebugLevel > 5) { + System.out.println("<-> RTCPSenderThread waking up"); + } + + // Regenerate nextDelay, before anything happens. + rtcpSession.calculateDelay(); + + // We'll wait here until a conflict (if any) has been resolved, + // so that the bye packets for our current SSRC can be sent. + if(rtpSession.conflict) { + if(! this.byesSent) { + sendByes(); + this.byesSent = true; + } + continue; + } + this.byesSent = false; + + //Grab the next person + Participant part = null; + + //Multicast + if(this.rtpSession.mcSession) { + if(! enu.hasMoreElements()) + enu = rtpSession.partDb.getParticipants(); + + if( enu.hasMoreElements() ) { + part = enu.nextElement(); + } else { + continue; + } + + //Unicast + } else { + if(! iter.hasNext()) { + iter = rtpSession.partDb.getUnicastReceivers(); + } + + if(iter.hasNext() ) { + while( iter.hasNext() && (part == null || part.rtcpAddress == null)) { + part = iter.next(); + } + } + + if(part == null || part.rtcpAddress == null) + continue; + } + + CompRtcpPkt compPkt = preparePacket(part, true); + + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if(rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + //part.debugPrint(); + datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + + /*********** Administrative tasks ***********/ + //Update average packet size + if(datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + } + + // Be polite, say Bye to everone + sendByes(); + try { Thread.sleep(200);} catch(Exception e) {} + + if(RTPSession.rtcpDebugLevel > 0) { + System.out.println("<-> RTCPSenderThread terminating"); + } + } +}