/**
* Java RTP Library (jlibrtp)
* Copyright (C) 2006 Arne Kepp
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
package jlibrtp;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.*;
/**
* This thread sends scheduled RTCP packets
*
* It also performs maintenance of various queues and the participant
* database.
*
* @author Arne Kepp
*
*/
public class RTCPSenderThread extends Thread {
/** Parent RTP Session */
private RTPSession rtpSession = null;
/** Parent RTCP Session */
private RTCPSession rtcpSession = null;
/** Whether we have sent byes for the last conflict */
private boolean byesSent = false;
/**
* Constructor for new thread
* @param rtcpSession parent RTCP session
* @param rtpSession parent RTP session
*/
protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) {
this.rtpSession = rtpSession;
this.rtcpSession = rtcpSession;
if(RTPSession.rtpDebugLevel > 1) {
System.out.println("<-> RTCPSenderThread created");
}
}
/**
* Send BYE messages to all the relevant participants
*
*/
protected void sendByes() {
// Create the packet
CompRtcpPkt compPkt = new CompRtcpPkt();
//Need a SR for validation
RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
compPkt.addPacket(srPkt);
byte[] reasonBytes;
//Add the actualy BYE Pkt
long[] ssrcArray = {this.rtpSession.ssrc};
if(rtpSession.conflict) {
reasonBytes = "SSRC collision".getBytes();
} else {
reasonBytes = "jlibrtp says bye bye!".getBytes();
}
RtcpPktBYE byePkt = new RtcpPktBYE( ssrcArray, reasonBytes);
compPkt.addPacket(byePkt);
// Send it off
if(rtpSession.mcSession) {
mcSendCompRtcpPkt(compPkt);
} else {
Iterator<Participant> iter = rtpSession.partDb.getUnicastReceivers();
while(iter.hasNext()) {
Participant part = (Participant) iter.next();
if(part.rtcpAddress != null)
sendCompRtcpPkt(compPkt, part.rtcpAddress);
}
//System.out.println("SENT BYE PACKETS!!!!!");
}
}
/**
* Multicast version of sending a Compound RTCP packet
*
* @param pkt the packet to best
* @return 0 is successful, -1 otherwise
*/
protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) {
byte[] pktBytes = pkt.encode();
DatagramPacket packet;
// Create datagram
try {
packet = new DatagramPacket(pktBytes,pktBytes.length,rtpSession.mcGroup,rtcpSession.rtcpMCSock.getPort());
} catch (Exception e) {
System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed.");
e.printStackTrace();
return -1;
}
// Send packet
if(RTPSession.rtcpDebugLevel > 5) {
System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast");
}
try {
rtcpSession.rtcpMCSock.send(packet);
//Debug
if(this.rtpSession.debugAppIntf != null) {
this.rtpSession.debugAppIntf.packetSent(3, (InetSocketAddress) packet.getSocketAddress(),
new String("Sent multicast RTCP packet of size " + packet.getLength() +
" to " + packet.getSocketAddress().toString() + " via "
+ this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString()));
}
} catch (Exception e) {
System.out.println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed.");
e.printStackTrace();
return -1;
}
return packet.getLength();
}
/**
* Unicast version of sending a Compound RTCP packet
*
* @param pkt the packet to best
* @param receiver the socket address of the recipient
* @return 0 is successful, -1 otherwise
*/
protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) {
byte[] pktBytes = pkt.encode();
DatagramPacket packet;
//Create datagram
try {
//System.out.println("receiver: " + receiver);
packet = new DatagramPacket(pktBytes,pktBytes.length,receiver);
} catch (Exception e) {
System.out.println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed.");
e.printStackTrace();
return -1;
}
//Send packet
if(RTPSession.rtcpDebugLevel > 5) {
Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator();
String str = " ";
while(iter.hasNext()) {
RtcpPkt aPkt = iter.next();
str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
}
System.out.println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " + receiver + str);
}
try {
rtcpSession.rtcpSock.send(packet);
//Debug
if(this.rtpSession.debugAppIntf != null) {
this.rtpSession.debugAppIntf.packetSent(2, (InetSocketAddress) packet.getSocketAddress(),
new String("Sent unicast RTCP packet of size " + packet.getLength() +
" to " + packet.getSocketAddress().toString() + " via "
+ this.rtcpSession.rtcpSock.getLocalSocketAddress().toString()));
}
} catch (Exception e) {
System.out.println("RTCPSenderThread.SendCompRtcpPkt() unicast failed.");
e.printStackTrace();
return -1;
}
return packet.getLength();
}
/**
* Check whether we can send an immediate feedback packet to this person
* @param ssrc SSRC of participant
*/
protected void reconsiderTiming(long ssrc) {
Participant part = this.rtpSession.partDb.getParticipant(ssrc);
if( part != null && this.rtcpSession.fbSendImmediately()) {
CompRtcpPkt compPkt = preparePacket(part, false);
/*********** Send the packet ***********/
// Keep track of sent packet length for average;
int datagramLength;
if(rtpSession.mcSession) {
datagramLength = this.mcSendCompRtcpPkt(compPkt);
} else {
//part.debugPrint();
datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
}
/*********** Administrative tasks ***********/
//Update average packet size
if(datagramLength > 0) {
rtcpSession.updateAvgPacket(datagramLength);
}
} else if(part != null
&& this.rtcpSession.fbAllowEarly
&& this.rtcpSession.fbSendEarly()) {
// Make sure we dont do it too often
this.rtcpSession.fbAllowEarly = false;
CompRtcpPkt compPkt = preparePacket(part, true);
/*********** Send the packet ***********/
// Keep track of sent packet length for average;
int datagramLength;
if(rtpSession.mcSession) {
datagramLength = this.mcSendCompRtcpPkt(compPkt);
} else {
//part.debugPrint();
datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
}
/*********** Administrative tasks ***********/
//Update average packet size
if(datagramLength > 0) {
rtcpSession.updateAvgPacket(datagramLength);
}
rtcpSession.calculateDelay();
}
//Out of luck, fb message will have to go with next regular packet
//Sleep for the remaining time.
this.rtcpSession.nextDelay -= System.currentTimeMillis() - this.rtcpSession.prevTime;
if(this.rtcpSession.nextDelay < 0)
this.rtcpSession.nextDelay = 0;
}
/**
* Prepare a packet. The output depends on the participant and how the
* packet is scheduled.
*
* @param part the participant to report to
* @param regular whether this is a regularly, or early scheduled RTCP packet
* @return compound RTCP packet
*/
protected CompRtcpPkt preparePacket(Participant part, boolean regular) {
/*********** Figure out what we are going to send ***********/
// Check whether this person has sent RTP packets since the last RR.
boolean incRR = false;
if(part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) {
incRR = true;
part.secondLastRtcpRRPkt = part.lastRtcpRRPkt;
part.lastRtcpRRPkt = System.currentTimeMillis();
}
// Are we sending packets? -> add SR
boolean incSR = false;
if(rtpSession.sentPktCount > 0 && regular) {
incSR = true;
}
/*********** Actually create the packet ***********/
// Create compound packet
CompRtcpPkt compPkt = new CompRtcpPkt();
//If we're sending packets we'll use a SR for header
if(incSR) {
RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc,
this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, null);
compPkt.addPacket(srPkt);
if(part.ssrc > 0) {
RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
if(ar != null) {
for(int i=0; i<ar.length; i++) {
compPkt.addPacket(ar[i]);
}
}
}
}
//If we got anything from this participant since we sent the 2nd to last RtcpPkt
if(incRR || !incSR) {
Participant[] partArray = {part};
if(part.receivedPkts < 1)
partArray = null;
RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc);
compPkt.addPacket(rrPkt);
if( !incSR && part.ssrc > 0) {
RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc);
if(ar != null) {
for(int i=0; i<ar.length; i++) {
compPkt.addPacket(ar[i]);
}
}
}
}
// APP packets
if(regular && part.ssrc > 0) {
RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc);
if(ar != null) {
for(int i=0; i<ar.length; i++) {
compPkt.addPacket(ar[i]);
}
} else {
//Nope
}
}
// For now we'll stick the SDES on every time, and only for us
//if(regular) {
RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null);
compPkt.addPacket(sdesPkt);
//}
return compPkt;
}
/**
* Start the RTCP sender thread.
*
* RFC 4585 is more complicated, but in general it will
* 1) Wait a precalculated amount of time
* 2) Determine the next RTCP recipient
* 3) Construct a compound packet with all the relevant information
* 4) Send the packet
* 5) Calculate next delay before going to sleep
*/
public void run() {
if(RTPSession.rtcpDebugLevel > 1) {
System.out.println("<-> RTCPSenderThread running");
}
// Give the application a chance to register some participants
try { Thread.sleep(10); }
catch (Exception e) { System.out.println("RTCPSenderThread didn't get any initial rest."); }
// Set up an iterator for the member list
Enumeration<Participant> enu = null;
Iterator<Participant> iter = null;
// TODO Change to rtcpReceivers
if(rtpSession.mcSession) {
enu = rtpSession.partDb.getParticipants();
} else {
iter = rtpSession.partDb.getUnicastReceivers();
}
while(! rtpSession.endSession) {
if(RTPSession.rtcpDebugLevel > 5) {
System.out.println("<-> RTCPSenderThread sleeping for " +rtcpSession.nextDelay+" ms");
}
try { Thread.sleep(rtcpSession.nextDelay); }
catch (Exception e) {
System.out.println("RTCPSenderThread Exception message:" + e.getMessage());
// Is the party over?
if(this.rtpSession.endSession) {
continue;
}
if(rtcpSession.fbWaiting != -1) {
reconsiderTiming(rtcpSession.fbWaiting);
continue;
}
}
/** Came here the regular way */
this.rtcpSession.fbAllowEarly = true;
if(RTPSession.rtcpDebugLevel > 5) {
System.out.println("<-> RTCPSenderThread waking up");
}
// Regenerate nextDelay, before anything happens.
rtcpSession.calculateDelay();
// We'll wait here until a conflict (if any) has been resolved,
// so that the bye packets for our current SSRC can be sent.
if(rtpSession.conflict) {
if(! this.byesSent) {
sendByes();
this.byesSent = true;
}
continue;
}
this.byesSent = false;
//Grab the next person
Participant part = null;
//Multicast
if(this.rtpSession.mcSession) {
if(! enu.hasMoreElements())
enu = rtpSession.partDb.getParticipants();
if( enu.hasMoreElements() ) {
part = enu.nextElement();
} else {
continue;
}
//Unicast
} else {
if(! iter.hasNext()) {
iter = rtpSession.partDb.getUnicastReceivers();
}
if(iter.hasNext() ) {
while( iter.hasNext() && (part == null || part.rtcpAddress == null)) {
part = iter.next();
}
}
if(part == null || part.rtcpAddress == null)
continue;
}
CompRtcpPkt compPkt = preparePacket(part, true);
/*********** Send the packet ***********/
// Keep track of sent packet length for average;
int datagramLength;
if(rtpSession.mcSession) {
datagramLength = this.mcSendCompRtcpPkt(compPkt);
} else {
//part.debugPrint();
datagramLength = this.sendCompRtcpPkt(compPkt, part.rtcpAddress);
}
/*********** Administrative tasks ***********/
//Update average packet size
if(datagramLength > 0) {
rtcpSession.updateAvgPacket(datagramLength);
}
}
// Be polite, say Bye to everone
sendByes();
try { Thread.sleep(200);} catch(Exception e) {}
if(RTPSession.rtcpDebugLevel > 0) {
System.out.println("<-> RTCPSenderThread terminating");
}
}
}