diff -r 537ddd8aa407 -r 2036ebfaccda src/jlibrtp/RTCPSenderThread.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jlibrtp/RTCPSenderThread.java Fri Nov 20 19:29:42 2009 +0100 @@ -0,0 +1,496 @@ +/** + * Java RTP Library (jlibrtp) + * Copyright (C) 2006 Arne Kepp + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ +package jlibrtp; + +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.util.Enumeration; +import java.util.Iterator; + +/** + * This thread sends scheduled RTCP packets + * + * It also performs maintenance of various queues and the participant database. + * + * @author Arne Kepp + * + */ +public class RTCPSenderThread extends Thread { + /** Parent RTP Session */ + private RTPSession rtpSession = null; + /** Parent RTCP Session */ + private RTCPSession rtcpSession = null; + + /** Whether we have sent byes for the last conflict */ + private boolean byesSent = false; + + /** + * Constructor for new thread + * + * @param rtcpSession + * parent RTCP session + * @param rtpSession + * parent RTP session + */ + protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) { + this.rtpSession = rtpSession; + this.rtcpSession = rtcpSession; + if (RTPSession.rtpDebugLevel > 1) { + System.out.println("<-> RTCPSenderThread created"); + } + } + + /** + * Send BYE messages to all the relevant participants + * + */ + protected void sendByes() { + // Create the packet + CompRtcpPkt compPkt = new CompRtcpPkt(); + + // Need a SR for validation + RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, + this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, + null); + compPkt.addPacket(srPkt); + + byte[] reasonBytes; + + // Add the actualy BYE Pkt + long[] ssrcArray = { this.rtpSession.ssrc }; + if (rtpSession.conflict) { + reasonBytes = "SSRC collision".getBytes(); + } else { + reasonBytes = "jlibrtp says bye bye!".getBytes(); + } + RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes); + + compPkt.addPacket(byePkt); + + // Send it off + if (rtpSession.mcSession) { + mcSendCompRtcpPkt(compPkt); + } else { + Iterator iter = rtpSession.partDb + .getUnicastReceivers(); + + while (iter.hasNext()) { + Participant part = (Participant) iter.next(); + if (part.rtcpAddress != null) + sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + // System.out.println("SENT BYE PACKETS!!!!!"); + } + } + + /** + * Multicast version of sending a Compound RTCP packet + * + * @param pkt + * the packet to best + * @return 0 is successful, -1 otherwise + */ + protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) { + byte[] pktBytes = pkt.encode(); + DatagramPacket packet; + + // Create datagram + try { + packet = new DatagramPacket(pktBytes, pktBytes.length, + rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort()); + } catch (Exception e) { + System.out + .println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed."); + e.printStackTrace(); + return -1; + } + + // Send packet + if (RTPSession.rtcpDebugLevel > 5) { + System.out + .println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast"); + } + try { + rtcpSession.rtcpMCSock.send(packet); + // Debug + if (this.rtpSession.debugAppIntf != null) { + this.rtpSession.debugAppIntf.packetSent(3, + (InetSocketAddress) packet.getSocketAddress(), + new String("Sent multicast RTCP packet of size " + + packet.getLength() + + " to " + + packet.getSocketAddress().toString() + + " via " + + this.rtcpSession.rtcpMCSock + .getLocalSocketAddress().toString())); + } + } catch (Exception e) { + System.out + .println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed."); + e.printStackTrace(); + return -1; + } + return packet.getLength(); + } + + /** + * Unicast version of sending a Compound RTCP packet + * + * @param pkt + * the packet to best + * @param receiver + * the socket address of the recipient + * @return 0 is successful, -1 otherwise + */ + protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) { + byte[] pktBytes = pkt.encode(); + DatagramPacket packet; + + // Create datagram + try { + // System.out.println("receiver: " + receiver); + packet = new DatagramPacket(pktBytes, pktBytes.length, receiver); + } catch (Exception e) { + System.out + .println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed."); + e.printStackTrace(); + return -1; + } + + // Send packet + if (RTPSession.rtcpDebugLevel > 5) { + Iterator iter = pkt.rtcpPkts.iterator(); + String str = " "; + while (iter.hasNext()) { + RtcpPkt aPkt = iter.next(); + str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", "); + } + System.out + .println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + + receiver + str); + } + try { + rtcpSession.rtcpSock.send(packet); + // Debug + if (this.rtpSession.debugAppIntf != null) { + this.rtpSession.debugAppIntf.packetSent(2, + (InetSocketAddress) packet.getSocketAddress(), + new String("Sent unicast RTCP packet of size " + + packet.getLength() + + " to " + + packet.getSocketAddress().toString() + + " via " + + this.rtcpSession.rtcpSock + .getLocalSocketAddress().toString())); + } + } catch (Exception e) { + System.out + .println("RTCPSenderThread.SendCompRtcpPkt() unicast failed."); + e.printStackTrace(); + return -1; + } + return packet.getLength(); + } + + /** + * Check whether we can send an immediate feedback packet to this person + * + * @param ssrc + * SSRC of participant + */ + protected void reconsiderTiming(long ssrc) { + Participant part = this.rtpSession.partDb.getParticipant(ssrc); + + if (part != null && this.rtcpSession.fbSendImmediately()) { + CompRtcpPkt compPkt = preparePacket(part, false); + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if (rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + // part.debugPrint(); + datagramLength = this + .sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + /*********** Administrative tasks ***********/ + // Update average packet size + if (datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + } else if (part != null && this.rtcpSession.fbAllowEarly + && this.rtcpSession.fbSendEarly()) { + + // Make sure we dont do it too often + this.rtcpSession.fbAllowEarly = false; + + CompRtcpPkt compPkt = preparePacket(part, true); + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if (rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + // part.debugPrint(); + datagramLength = this + .sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + /*********** Administrative tasks ***********/ + // Update average packet size + if (datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + rtcpSession.calculateDelay(); + } + + // Out of luck, fb message will have to go with next regular packet + // Sleep for the remaining time. + this.rtcpSession.nextDelay -= System.currentTimeMillis() + - this.rtcpSession.prevTime; + if (this.rtcpSession.nextDelay < 0) + this.rtcpSession.nextDelay = 0; + + } + + /** + * Prepare a packet. The output depends on the participant and how the + * packet is scheduled. + * + * @param part + * the participant to report to + * @param regular + * whether this is a regularly, or early scheduled RTCP packet + * @return compound RTCP packet + */ + protected CompRtcpPkt preparePacket(Participant part, boolean regular) { + /*********** Figure out what we are going to send ***********/ + // Check whether this person has sent RTP packets since the last RR. + boolean incRR = false; + if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) { + incRR = true; + part.secondLastRtcpRRPkt = part.lastRtcpRRPkt; + part.lastRtcpRRPkt = System.currentTimeMillis(); + } + + // Are we sending packets? -> add SR + boolean incSR = false; + if (rtpSession.sentPktCount > 0 && regular) { + incSR = true; + } + + /*********** Actually create the packet ***********/ + // Create compound packet + CompRtcpPkt compPkt = new CompRtcpPkt(); + + // If we're sending packets we'll use a SR for header + if (incSR) { + RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, + this.rtpSession.sentPktCount, + this.rtpSession.sentOctetCount, null); + compPkt.addPacket(srPkt); + + if (part.ssrc > 0) { + RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); + if (ar != null) { + for (int i = 0; i < ar.length; i++) { + compPkt.addPacket(ar[i]); + } + } + } + + } + + // If we got anything from this participant since we sent the 2nd to + // last RtcpPkt + if (incRR || !incSR) { + Participant[] partArray = { part }; + + if (part.receivedPkts < 1) + partArray = null; + + RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc); + compPkt.addPacket(rrPkt); + + if (!incSR && part.ssrc > 0) { + RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); + if (ar != null) { + for (int i = 0; i < ar.length; i++) { + compPkt.addPacket(ar[i]); + } + } + } + } + + // APP packets + if (regular && part.ssrc > 0) { + RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc); + if (ar != null) { + for (int i = 0; i < ar.length; i++) { + compPkt.addPacket(ar[i]); + } + } else { + // Nope + } + } + + // For now we'll stick the SDES on every time, and only for us + // if(regular) { + RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null); + compPkt.addPacket(sdesPkt); + // } + + return compPkt; + } + + /** + * Start the RTCP sender thread. + * + * RFC 4585 is more complicated, but in general it will 1) Wait a + * precalculated amount of time 2) Determine the next RTCP recipient 3) + * Construct a compound packet with all the relevant information 4) Send the + * packet 5) Calculate next delay before going to sleep + */ + public void run() { + if (RTPSession.rtcpDebugLevel > 1) { + System.out.println("<-> RTCPSenderThread running"); + } + + // Give the application a chance to register some participants + try { + Thread.sleep(10); + } catch (Exception e) { + System.out.println("RTCPSenderThread didn't get any initial rest."); + } + + // Set up an iterator for the member list + Enumeration enu = null; + Iterator iter = null; + + // TODO Change to rtcpReceivers + if (rtpSession.mcSession) { + enu = rtpSession.partDb.getParticipants(); + } else { + iter = rtpSession.partDb.getUnicastReceivers(); + } + while (!rtpSession.endSession) { + if (RTPSession.rtcpDebugLevel > 5) { + System.out.println("<-> RTCPSenderThread sleeping for " + + rtcpSession.nextDelay + " ms"); + } + + try { + Thread.sleep(rtcpSession.nextDelay); + } catch (Exception e) { + System.out.println("RTCPSenderThread Exception message:" + + e.getMessage()); + // Is the party over? + if (this.rtpSession.endSession) { + continue; + } + + if (rtcpSession.fbWaiting != -1) { + reconsiderTiming(rtcpSession.fbWaiting); + continue; + } + } + + /** Came here the regular way */ + this.rtcpSession.fbAllowEarly = true; + + if (RTPSession.rtcpDebugLevel > 5) { + System.out.println("<-> RTCPSenderThread waking up"); + } + + // Regenerate nextDelay, before anything happens. + rtcpSession.calculateDelay(); + + // We'll wait here until a conflict (if any) has been resolved, + // so that the bye packets for our current SSRC can be sent. + if (rtpSession.conflict) { + if (!this.byesSent) { + sendByes(); + this.byesSent = true; + } + continue; + } + this.byesSent = false; + + // Grab the next person + Participant part = null; + + // Multicast + if (this.rtpSession.mcSession) { + if (!enu.hasMoreElements()) + enu = rtpSession.partDb.getParticipants(); + + if (enu.hasMoreElements()) { + part = enu.nextElement(); + } else { + continue; + } + + // Unicast + } else { + if (!iter.hasNext()) { + iter = rtpSession.partDb.getUnicastReceivers(); + } + + if (iter.hasNext()) { + while (iter.hasNext() + && (part == null || part.rtcpAddress == null)) { + part = iter.next(); + } + } + + if (part == null || part.rtcpAddress == null) + continue; + } + + CompRtcpPkt compPkt = preparePacket(part, true); + + /*********** Send the packet ***********/ + // Keep track of sent packet length for average; + int datagramLength; + if (rtpSession.mcSession) { + datagramLength = this.mcSendCompRtcpPkt(compPkt); + } else { + // part.debugPrint(); + datagramLength = this + .sendCompRtcpPkt(compPkt, part.rtcpAddress); + } + + /*********** Administrative tasks ***********/ + // Update average packet size + if (datagramLength > 0) { + rtcpSession.updateAvgPacket(datagramLength); + } + } + + // Be polite, say Bye to everone + sendByes(); + try { + Thread.sleep(200); + } catch (Exception e) { + } + + if (RTPSession.rtcpDebugLevel > 0) { + System.out.println("<-> RTCPSenderThread terminating"); + } + } +}