1 /** |
|
2 * Java RTP Library (jlibrtp) |
|
3 * Copyright (C) 2006 Arne Kepp |
|
4 * |
|
5 * This library is free software; you can redistribute it and/or |
|
6 * modify it under the terms of the GNU Lesser General Public |
|
7 * License as published by the Free Software Foundation; either |
|
8 * version 2.1 of the License, or (at your option) any later version. |
|
9 * |
|
10 * This library is distributed in the hope that it will be useful, |
|
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
13 * Lesser General Public License for more details. |
|
14 * |
|
15 * You should have received a copy of the GNU Lesser General Public |
|
16 * License along with this library; if not, write to the Free Software |
|
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
|
18 */ |
|
19 package jlibrtp; |
|
20 |
|
21 import java.net.DatagramPacket; |
|
22 import java.net.InetSocketAddress; |
|
23 import java.util.Enumeration; |
|
24 import java.util.Iterator; |
|
25 |
|
26 /** |
|
27 * This thread sends scheduled RTCP packets |
|
28 * |
|
29 * It also performs maintenance of various queues and the participant database. |
|
30 * |
|
31 * @author Arne Kepp |
|
32 * |
|
33 */ |
|
34 public class RTCPSenderThread extends Thread { |
|
35 /** Parent RTP Session */ |
|
36 private RTPSession rtpSession = null; |
|
37 /** Parent RTCP Session */ |
|
38 private RTCPSession rtcpSession = null; |
|
39 |
|
40 /** Whether we have sent byes for the last conflict */ |
|
41 private boolean byesSent = false; |
|
42 |
|
43 /** |
|
44 * Constructor for new thread |
|
45 * |
|
46 * @param rtcpSession |
|
47 * parent RTCP session |
|
48 * @param rtpSession |
|
49 * parent RTP session |
|
50 */ |
|
51 protected RTCPSenderThread(RTCPSession rtcpSession, RTPSession rtpSession) { |
|
52 this.rtpSession = rtpSession; |
|
53 this.rtcpSession = rtcpSession; |
|
54 if (RTPSession.rtpDebugLevel > 1) { |
|
55 System.out.println("<-> RTCPSenderThread created"); |
|
56 } |
|
57 } |
|
58 |
|
59 /** |
|
60 * Send BYE messages to all the relevant participants |
|
61 * |
|
62 */ |
|
63 protected void sendByes() { |
|
64 // Create the packet |
|
65 CompRtcpPkt compPkt = new CompRtcpPkt(); |
|
66 |
|
67 // Need a SR for validation |
|
68 RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, |
|
69 this.rtpSession.sentPktCount, this.rtpSession.sentOctetCount, |
|
70 null); |
|
71 compPkt.addPacket(srPkt); |
|
72 |
|
73 byte[] reasonBytes; |
|
74 |
|
75 // Add the actualy BYE Pkt |
|
76 long[] ssrcArray = { this.rtpSession.ssrc }; |
|
77 if (rtpSession.conflict) { |
|
78 reasonBytes = "SSRC collision".getBytes(); |
|
79 } else { |
|
80 reasonBytes = "jlibrtp says bye bye!".getBytes(); |
|
81 } |
|
82 RtcpPktBYE byePkt = new RtcpPktBYE(ssrcArray, reasonBytes); |
|
83 |
|
84 compPkt.addPacket(byePkt); |
|
85 |
|
86 // Send it off |
|
87 if (rtpSession.mcSession) { |
|
88 mcSendCompRtcpPkt(compPkt); |
|
89 } else { |
|
90 Iterator<Participant> iter = rtpSession.partDb |
|
91 .getUnicastReceivers(); |
|
92 |
|
93 while (iter.hasNext()) { |
|
94 Participant part = (Participant) iter.next(); |
|
95 if (part.rtcpAddress != null) |
|
96 sendCompRtcpPkt(compPkt, part.rtcpAddress); |
|
97 } |
|
98 // System.out.println("SENT BYE PACKETS!!!!!"); |
|
99 } |
|
100 } |
|
101 |
|
102 /** |
|
103 * Multicast version of sending a Compound RTCP packet |
|
104 * |
|
105 * @param pkt |
|
106 * the packet to best |
|
107 * @return 0 is successful, -1 otherwise |
|
108 */ |
|
109 protected int mcSendCompRtcpPkt(CompRtcpPkt pkt) { |
|
110 byte[] pktBytes = pkt.encode(); |
|
111 DatagramPacket packet; |
|
112 |
|
113 // Create datagram |
|
114 try { |
|
115 packet = new DatagramPacket(pktBytes, pktBytes.length, |
|
116 rtpSession.mcGroup, rtcpSession.rtcpMCSock.getPort()); |
|
117 } catch (Exception e) { |
|
118 System.out |
|
119 .println("RCTPSenderThread.MCSendCompRtcpPkt() packet creation failed."); |
|
120 e.printStackTrace(); |
|
121 return -1; |
|
122 } |
|
123 |
|
124 // Send packet |
|
125 if (RTPSession.rtcpDebugLevel > 5) { |
|
126 System.out |
|
127 .println("<-> RTCPSenderThread.SendCompRtcpPkt() multicast"); |
|
128 } |
|
129 try { |
|
130 rtcpSession.rtcpMCSock.send(packet); |
|
131 // Debug |
|
132 if (this.rtpSession.debugAppIntf != null) { |
|
133 this.rtpSession.debugAppIntf.packetSent(3, |
|
134 (InetSocketAddress) packet.getSocketAddress(), |
|
135 new String("Sent multicast RTCP packet of size " |
|
136 + packet.getLength() |
|
137 + " to " |
|
138 + packet.getSocketAddress().toString() |
|
139 + " via " |
|
140 + this.rtcpSession.rtcpMCSock |
|
141 .getLocalSocketAddress().toString())); |
|
142 } |
|
143 } catch (Exception e) { |
|
144 System.out |
|
145 .println("RCTPSenderThread.MCSendCompRtcpPkt() multicast failed."); |
|
146 e.printStackTrace(); |
|
147 return -1; |
|
148 } |
|
149 return packet.getLength(); |
|
150 } |
|
151 |
|
152 /** |
|
153 * Unicast version of sending a Compound RTCP packet |
|
154 * |
|
155 * @param pkt |
|
156 * the packet to best |
|
157 * @param receiver |
|
158 * the socket address of the recipient |
|
159 * @return 0 is successful, -1 otherwise |
|
160 */ |
|
161 protected int sendCompRtcpPkt(CompRtcpPkt pkt, InetSocketAddress receiver) { |
|
162 byte[] pktBytes = pkt.encode(); |
|
163 DatagramPacket packet; |
|
164 |
|
165 // Create datagram |
|
166 try { |
|
167 // System.out.println("receiver: " + receiver); |
|
168 packet = new DatagramPacket(pktBytes, pktBytes.length, receiver); |
|
169 } catch (Exception e) { |
|
170 System.out |
|
171 .println("RCTPSenderThread.SendCompRtcpPkt() packet creation failed."); |
|
172 e.printStackTrace(); |
|
173 return -1; |
|
174 } |
|
175 |
|
176 // Send packet |
|
177 if (RTPSession.rtcpDebugLevel > 5) { |
|
178 Iterator<RtcpPkt> iter = pkt.rtcpPkts.iterator(); |
|
179 String str = " "; |
|
180 while (iter.hasNext()) { |
|
181 RtcpPkt aPkt = iter.next(); |
|
182 str += (aPkt.getClass().toString() + ":" + aPkt.itemCount + ", "); |
|
183 } |
|
184 System.out |
|
185 .println("<-> RTCPSenderThread.SendCompRtcpPkt() unicast to " |
|
186 + receiver + str); |
|
187 } |
|
188 try { |
|
189 rtcpSession.rtcpSock.send(packet); |
|
190 // Debug |
|
191 if (this.rtpSession.debugAppIntf != null) { |
|
192 this.rtpSession.debugAppIntf.packetSent(2, |
|
193 (InetSocketAddress) packet.getSocketAddress(), |
|
194 new String("Sent unicast RTCP packet of size " |
|
195 + packet.getLength() |
|
196 + " to " |
|
197 + packet.getSocketAddress().toString() |
|
198 + " via " |
|
199 + this.rtcpSession.rtcpSock |
|
200 .getLocalSocketAddress().toString())); |
|
201 } |
|
202 } catch (Exception e) { |
|
203 System.out |
|
204 .println("RTCPSenderThread.SendCompRtcpPkt() unicast failed."); |
|
205 e.printStackTrace(); |
|
206 return -1; |
|
207 } |
|
208 return packet.getLength(); |
|
209 } |
|
210 |
|
211 /** |
|
212 * Check whether we can send an immediate feedback packet to this person |
|
213 * |
|
214 * @param ssrc |
|
215 * SSRC of participant |
|
216 */ |
|
217 protected void reconsiderTiming(long ssrc) { |
|
218 Participant part = this.rtpSession.partDb.getParticipant(ssrc); |
|
219 |
|
220 if (part != null && this.rtcpSession.fbSendImmediately()) { |
|
221 CompRtcpPkt compPkt = preparePacket(part, false); |
|
222 /*********** Send the packet ***********/ |
|
223 // Keep track of sent packet length for average; |
|
224 int datagramLength; |
|
225 if (rtpSession.mcSession) { |
|
226 datagramLength = this.mcSendCompRtcpPkt(compPkt); |
|
227 } else { |
|
228 // part.debugPrint(); |
|
229 datagramLength = this |
|
230 .sendCompRtcpPkt(compPkt, part.rtcpAddress); |
|
231 } |
|
232 /*********** Administrative tasks ***********/ |
|
233 // Update average packet size |
|
234 if (datagramLength > 0) { |
|
235 rtcpSession.updateAvgPacket(datagramLength); |
|
236 } |
|
237 } else if (part != null && this.rtcpSession.fbAllowEarly |
|
238 && this.rtcpSession.fbSendEarly()) { |
|
239 |
|
240 // Make sure we dont do it too often |
|
241 this.rtcpSession.fbAllowEarly = false; |
|
242 |
|
243 CompRtcpPkt compPkt = preparePacket(part, true); |
|
244 /*********** Send the packet ***********/ |
|
245 // Keep track of sent packet length for average; |
|
246 int datagramLength; |
|
247 if (rtpSession.mcSession) { |
|
248 datagramLength = this.mcSendCompRtcpPkt(compPkt); |
|
249 } else { |
|
250 // part.debugPrint(); |
|
251 datagramLength = this |
|
252 .sendCompRtcpPkt(compPkt, part.rtcpAddress); |
|
253 } |
|
254 /*********** Administrative tasks ***********/ |
|
255 // Update average packet size |
|
256 if (datagramLength > 0) { |
|
257 rtcpSession.updateAvgPacket(datagramLength); |
|
258 } |
|
259 rtcpSession.calculateDelay(); |
|
260 } |
|
261 |
|
262 // Out of luck, fb message will have to go with next regular packet |
|
263 // Sleep for the remaining time. |
|
264 this.rtcpSession.nextDelay -= System.currentTimeMillis() |
|
265 - this.rtcpSession.prevTime; |
|
266 if (this.rtcpSession.nextDelay < 0) |
|
267 this.rtcpSession.nextDelay = 0; |
|
268 |
|
269 } |
|
270 |
|
271 /** |
|
272 * Prepare a packet. The output depends on the participant and how the |
|
273 * packet is scheduled. |
|
274 * |
|
275 * @param part |
|
276 * the participant to report to |
|
277 * @param regular |
|
278 * whether this is a regularly, or early scheduled RTCP packet |
|
279 * @return compound RTCP packet |
|
280 */ |
|
281 protected CompRtcpPkt preparePacket(Participant part, boolean regular) { |
|
282 /*********** Figure out what we are going to send ***********/ |
|
283 // Check whether this person has sent RTP packets since the last RR. |
|
284 boolean incRR = false; |
|
285 if (part.secondLastRtcpRRPkt > part.lastRtcpRRPkt) { |
|
286 incRR = true; |
|
287 part.secondLastRtcpRRPkt = part.lastRtcpRRPkt; |
|
288 part.lastRtcpRRPkt = System.currentTimeMillis(); |
|
289 } |
|
290 |
|
291 // Are we sending packets? -> add SR |
|
292 boolean incSR = false; |
|
293 if (rtpSession.sentPktCount > 0 && regular) { |
|
294 incSR = true; |
|
295 } |
|
296 |
|
297 /*********** Actually create the packet ***********/ |
|
298 // Create compound packet |
|
299 CompRtcpPkt compPkt = new CompRtcpPkt(); |
|
300 |
|
301 // If we're sending packets we'll use a SR for header |
|
302 if (incSR) { |
|
303 RtcpPktSR srPkt = new RtcpPktSR(this.rtpSession.ssrc, |
|
304 this.rtpSession.sentPktCount, |
|
305 this.rtpSession.sentOctetCount, null); |
|
306 compPkt.addPacket(srPkt); |
|
307 |
|
308 if (part.ssrc > 0) { |
|
309 RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); |
|
310 if (ar != null) { |
|
311 for (int i = 0; i < ar.length; i++) { |
|
312 compPkt.addPacket(ar[i]); |
|
313 } |
|
314 } |
|
315 } |
|
316 |
|
317 } |
|
318 |
|
319 // If we got anything from this participant since we sent the 2nd to |
|
320 // last RtcpPkt |
|
321 if (incRR || !incSR) { |
|
322 Participant[] partArray = { part }; |
|
323 |
|
324 if (part.receivedPkts < 1) |
|
325 partArray = null; |
|
326 |
|
327 RtcpPktRR rrPkt = new RtcpPktRR(partArray, rtpSession.ssrc); |
|
328 compPkt.addPacket(rrPkt); |
|
329 |
|
330 if (!incSR && part.ssrc > 0) { |
|
331 RtcpPkt[] ar = this.rtcpSession.getFromFbQueue(part.ssrc); |
|
332 if (ar != null) { |
|
333 for (int i = 0; i < ar.length; i++) { |
|
334 compPkt.addPacket(ar[i]); |
|
335 } |
|
336 } |
|
337 } |
|
338 } |
|
339 |
|
340 // APP packets |
|
341 if (regular && part.ssrc > 0) { |
|
342 RtcpPkt[] ar = this.rtcpSession.getFromAppQueue(part.ssrc); |
|
343 if (ar != null) { |
|
344 for (int i = 0; i < ar.length; i++) { |
|
345 compPkt.addPacket(ar[i]); |
|
346 } |
|
347 } else { |
|
348 // Nope |
|
349 } |
|
350 } |
|
351 |
|
352 // For now we'll stick the SDES on every time, and only for us |
|
353 // if(regular) { |
|
354 RtcpPktSDES sdesPkt = new RtcpPktSDES(true, this.rtpSession, null); |
|
355 compPkt.addPacket(sdesPkt); |
|
356 // } |
|
357 |
|
358 return compPkt; |
|
359 } |
|
360 |
|
361 /** |
|
362 * Start the RTCP sender thread. |
|
363 * |
|
364 * RFC 4585 is more complicated, but in general it will 1) Wait a |
|
365 * precalculated amount of time 2) Determine the next RTCP recipient 3) |
|
366 * Construct a compound packet with all the relevant information 4) Send the |
|
367 * packet 5) Calculate next delay before going to sleep |
|
368 */ |
|
369 public void run() { |
|
370 if (RTPSession.rtcpDebugLevel > 1) { |
|
371 System.out.println("<-> RTCPSenderThread running"); |
|
372 } |
|
373 |
|
374 // Give the application a chance to register some participants |
|
375 try { |
|
376 Thread.sleep(10); |
|
377 } catch (Exception e) { |
|
378 System.out.println("RTCPSenderThread didn't get any initial rest."); |
|
379 } |
|
380 |
|
381 // Set up an iterator for the member list |
|
382 Enumeration<Participant> enu = null; |
|
383 Iterator<Participant> iter = null; |
|
384 |
|
385 // TODO Change to rtcpReceivers |
|
386 if (rtpSession.mcSession) { |
|
387 enu = rtpSession.partDb.getParticipants(); |
|
388 } else { |
|
389 iter = rtpSession.partDb.getUnicastReceivers(); |
|
390 } |
|
391 while (!rtpSession.endSession) { |
|
392 if (RTPSession.rtcpDebugLevel > 5) { |
|
393 System.out.println("<-> RTCPSenderThread sleeping for " |
|
394 + rtcpSession.nextDelay + " ms"); |
|
395 } |
|
396 |
|
397 try { |
|
398 Thread.sleep(rtcpSession.nextDelay); |
|
399 } catch (Exception e) { |
|
400 System.out.println("RTCPSenderThread Exception message:" |
|
401 + e.getMessage()); |
|
402 // Is the party over? |
|
403 if (this.rtpSession.endSession) { |
|
404 continue; |
|
405 } |
|
406 |
|
407 if (rtcpSession.fbWaiting != -1) { |
|
408 reconsiderTiming(rtcpSession.fbWaiting); |
|
409 continue; |
|
410 } |
|
411 } |
|
412 |
|
413 /** Came here the regular way */ |
|
414 this.rtcpSession.fbAllowEarly = true; |
|
415 |
|
416 if (RTPSession.rtcpDebugLevel > 5) { |
|
417 System.out.println("<-> RTCPSenderThread waking up"); |
|
418 } |
|
419 |
|
420 // Regenerate nextDelay, before anything happens. |
|
421 rtcpSession.calculateDelay(); |
|
422 |
|
423 // We'll wait here until a conflict (if any) has been resolved, |
|
424 // so that the bye packets for our current SSRC can be sent. |
|
425 if (rtpSession.conflict) { |
|
426 if (!this.byesSent) { |
|
427 sendByes(); |
|
428 this.byesSent = true; |
|
429 } |
|
430 continue; |
|
431 } |
|
432 this.byesSent = false; |
|
433 |
|
434 // Grab the next person |
|
435 Participant part = null; |
|
436 |
|
437 // Multicast |
|
438 if (this.rtpSession.mcSession) { |
|
439 if (!enu.hasMoreElements()) |
|
440 enu = rtpSession.partDb.getParticipants(); |
|
441 |
|
442 if (enu.hasMoreElements()) { |
|
443 part = enu.nextElement(); |
|
444 } else { |
|
445 continue; |
|
446 } |
|
447 |
|
448 // Unicast |
|
449 } else { |
|
450 if (!iter.hasNext()) { |
|
451 iter = rtpSession.partDb.getUnicastReceivers(); |
|
452 } |
|
453 |
|
454 if (iter.hasNext()) { |
|
455 while (iter.hasNext() |
|
456 && (part == null || part.rtcpAddress == null)) { |
|
457 part = iter.next(); |
|
458 } |
|
459 } |
|
460 |
|
461 if (part == null || part.rtcpAddress == null) |
|
462 continue; |
|
463 } |
|
464 |
|
465 CompRtcpPkt compPkt = preparePacket(part, true); |
|
466 |
|
467 /*********** Send the packet ***********/ |
|
468 // Keep track of sent packet length for average; |
|
469 int datagramLength; |
|
470 if (rtpSession.mcSession) { |
|
471 datagramLength = this.mcSendCompRtcpPkt(compPkt); |
|
472 } else { |
|
473 // part.debugPrint(); |
|
474 datagramLength = this |
|
475 .sendCompRtcpPkt(compPkt, part.rtcpAddress); |
|
476 } |
|
477 |
|
478 /*********** Administrative tasks ***********/ |
|
479 // Update average packet size |
|
480 if (datagramLength > 0) { |
|
481 rtcpSession.updateAvgPacket(datagramLength); |
|
482 } |
|
483 } |
|
484 |
|
485 // Be polite, say Bye to everone |
|
486 sendByes(); |
|
487 try { |
|
488 Thread.sleep(200); |
|
489 } catch (Exception e) { |
|
490 } |
|
491 |
|
492 if (RTPSession.rtcpDebugLevel > 0) { |
|
493 System.out.println("<-> RTCPSenderThread terminating"); |
|
494 } |
|
495 } |
|
496 } |
|