1 /** |
|
2 * Java RTP Library (jlibrtp) |
|
3 * Copyright (C) 2006 Arne Kepp |
|
4 * |
|
5 * This library is free software; you can redistribute it and/or |
|
6 * modify it under the terms of the GNU Lesser General Public |
|
7 * License as published by the Free Software Foundation; either |
|
8 * version 2.1 of the License, or (at your option) any later version. |
|
9 * |
|
10 * This library is distributed in the hope that it will be useful, |
|
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
13 * Lesser General Public License for more details. |
|
14 * |
|
15 * You should have received a copy of the GNU Lesser General Public |
|
16 * License along with this library; if not, write to the Free Software |
|
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
|
18 */ |
|
19 package jlibrtp; |
|
20 |
|
21 import java.io.IOException; |
|
22 import java.net.DatagramPacket; |
|
23 import java.net.InetSocketAddress; |
|
24 import java.net.SocketException; |
|
25 |
|
26 import org.sipdroid.net.tools.DatagramPool; |
|
27 import org.sipdroid.net.tools.RtpPktPool; |
|
28 |
|
29 /** |
|
30 * The RTP receiver thread waits on the designated UDP socket for new packets. |
|
31 * |
|
32 * Once one arrives, it is parsed and tested. We also check the ip-address of |
|
33 * the sender. If accepted, the packet is added onto the packet buffer of the |
|
34 * participant. |
|
35 * |
|
36 * A separate thread moves the packet from the packet buffer to the application. |
|
37 * |
|
38 * @author Arne Kepp |
|
39 */ |
|
40 public class RTPReceiverThread extends Thread { |
|
41 /** Parent RTP Session */ |
|
42 RTPSession rtpSession = null; |
|
43 DatagramPool datagramPool = null; |
|
44 |
|
45 RTPReceiverThread(RTPSession session) { |
|
46 rtpSession = session; |
|
47 datagramPool = DatagramPool.getInstance(); |
|
48 if (RTPSession.rtpDebugLevel > 1) { |
|
49 System.out.println("<-> RTPReceiverThread created"); |
|
50 } |
|
51 } |
|
52 public void init() { |
|
53 if (RTPSession.rtpDebugLevel > 1) { |
|
54 if (rtpSession.mcSession) { |
|
55 System.out.println("-> RTPReceiverThread.run() starting on MC " |
|
56 + rtpSession.rtpMCSock.getLocalPort()); |
|
57 } else { |
|
58 System.out.println("-> RTPReceiverThread.run() starting on " |
|
59 + rtpSession.rtpSock.getLocalPort()); |
|
60 } |
|
61 } |
|
62 |
|
63 android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO); |
|
64 |
|
65 DatagramPacket packet = datagramPool.borrowPacket(); |
|
66 try { |
|
67 rtpSession.rtpSock.setSoTimeout(1); |
|
68 for (;;) |
|
69 rtpSession.rtpSock.receive(packet); |
|
70 } catch (SocketException e2) { |
|
71 |
|
72 } catch (IOException e) { |
|
73 } |
|
74 datagramPool.returnPacket(packet); |
|
75 try { |
|
76 rtpSession.rtpSock.setSoTimeout(1000); |
|
77 } catch (SocketException e2) { |
|
78 } |
|
79 } |
|
80 |
|
81 public void readPacketToBuffer() { |
|
82 if (RTPSession.rtpDebugLevel > 6) { |
|
83 if (rtpSession.mcSession) { |
|
84 System.out |
|
85 .println("-> RTPReceiverThread.run() waiting for MC packet on " |
|
86 + rtpSession.rtpMCSock.getLocalPort()); |
|
87 } else { |
|
88 System.out |
|
89 .println("-> RTPReceiverThread.run() waiting for packet on " |
|
90 + rtpSession.rtpSock.getLocalPort()); |
|
91 } |
|
92 } |
|
93 |
|
94 // Prepare a packet |
|
95 //DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length); |
|
96 DatagramPacket packet = datagramPool.borrowPacket(); |
|
97 // Wait for it to arrive |
|
98 if (!rtpSession.mcSession) { |
|
99 // Unicast |
|
100 try { |
|
101 rtpSession.rtpSock.receive(packet); |
|
102 } catch (IOException e) { |
|
103 if (!rtpSession.endSession) { |
|
104 e.printStackTrace(); |
|
105 } else { |
|
106 return; |
|
107 } |
|
108 } |
|
109 } else { |
|
110 // Multicast |
|
111 try { |
|
112 rtpSession.rtpMCSock.receive(packet); |
|
113 } catch (IOException e) { |
|
114 if (!rtpSession.endSession) { |
|
115 e.printStackTrace(); |
|
116 } else { |
|
117 return; |
|
118 } |
|
119 } |
|
120 } |
|
121 // Parse the received RTP (?) packet |
|
122 RtpPkt pkt = RtpPktPool.getInstance().borrowPkt(); |
|
123 pkt.initPacket(packet.getData(), packet.getLength(), packet); |
|
124 |
|
125 // Check whether it was valid. |
|
126 if (pkt == null) { |
|
127 System.out.println("Received invalid RTP packet. Ignoring"); |
|
128 return; |
|
129 } |
|
130 |
|
131 long pktSsrc = pkt.getSsrc(); |
|
132 |
|
133 // Check for loops and SSRC collisions |
|
134 if (rtpSession.ssrc == pktSsrc) |
|
135 rtpSession.resolveSsrcConflict(); |
|
136 |
|
137 long[] csrcArray = pkt.getCsrcArray(); |
|
138 if (csrcArray != null) { |
|
139 for (int i = 0; i < csrcArray.length; i++) { |
|
140 if (csrcArray[i] == rtpSession.ssrc) |
|
141 ; |
|
142 rtpSession.resolveSsrcConflict(); |
|
143 } |
|
144 } |
|
145 |
|
146 if (RTPSession.rtpDebugLevel > 17) { |
|
147 System.out |
|
148 .println("-> RTPReceiverThread.run() rcvd packet, seqNum " |
|
149 + pktSsrc); |
|
150 if (RTPSession.rtpDebugLevel > 10) { |
|
151 System.out.println("-> RTPReceiverThread.run() payload is " |
|
152 + pkt.getPayloadLength()); |
|
153 } |
|
154 } |
|
155 |
|
156 // Find the participant in the database based on SSRC |
|
157 //Participant part = rtpSession.partDb.getParticipant(pktSsrc); |
|
158 Participant part = rtpSession.firstPart; |
|
159 if (part == null) { |
|
160 InetSocketAddress nullSocket = null; |
|
161 part = new Participant((InetSocketAddress) packet |
|
162 .getSocketAddress(), nullSocket, pkt.getSsrc()); |
|
163 part.unexpected = true; |
|
164 rtpSession.partDb.addParticipant(1, part); |
|
165 } |
|
166 |
|
167 // Do checks on whether the datagram came from the expected source |
|
168 // for that SSRC. |
|
169 |
|
170 if (part.rtpAddress == null |
|
171 || packet.getAddress().equals(part.rtpAddress.getAddress())) { |
|
172 PktBuffer pktBuffer = part.pktBuffer; |
|
173 |
|
174 if (pktBuffer != null) { |
|
175 // A buffer already exists, append to it |
|
176 pktBuffer.addPkt(pkt); |
|
177 } else { |
|
178 // Create a new packet/frame buffer |
|
179 pktBuffer = new PktBuffer(this.rtpSession, part, pkt); |
|
180 part.pktBuffer = pktBuffer; |
|
181 } |
|
182 } else { |
|
183 System.out |
|
184 .println("RTPReceiverThread: Got an unexpected packet from " |
|
185 + pkt.getSsrc() |
|
186 + " the sending ip-address was " |
|
187 + packet.getAddress().toString() |
|
188 + ", we expected from " |
|
189 + part.rtpAddress.toString()); |
|
190 } |
|
191 |
|
192 // Statistics for receiver report. |
|
193 part.updateRRStats(packet.getLength(), pkt); |
|
194 // Upate liveness |
|
195 part.lastRtpPkt = System.currentTimeMillis(); |
|
196 |
|
197 if (RTPSession.rtpDebugLevel > 5) { |
|
198 System.out |
|
199 .println("<-> RTPReceiverThread signalling pktBufDataReady"); |
|
200 } |
|
201 |
|
202 // Signal the thread that pushes data to application |
|
203 /*rtpSession.pktBufLock.lock(); |
|
204 try { |
|
205 rtpSession.pktBufDataReady.signalAll(); |
|
206 } finally { |
|
207 rtpSession.pktBufLock.unlock(); |
|
208 }*/ |
|
209 } |
|
210 |
|
211 public void run() { |
|
212 if (RTPSession.rtpDebugLevel > 1) { |
|
213 if (rtpSession.mcSession) { |
|
214 System.out.println("-> RTPReceiverThread.run() starting on MC " |
|
215 + rtpSession.rtpMCSock.getLocalPort()); |
|
216 } else { |
|
217 System.out.println("-> RTPReceiverThread.run() starting on " |
|
218 + rtpSession.rtpSock.getLocalPort()); |
|
219 } |
|
220 } |
|
221 |
|
222 android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO); |
|
223 |
|
224 DatagramPacket packet = datagramPool.borrowPacket(); |
|
225 try { |
|
226 rtpSession.rtpSock.setSoTimeout(1); |
|
227 for (;;) |
|
228 rtpSession.rtpSock.receive(packet); |
|
229 } catch (SocketException e2) { |
|
230 |
|
231 } catch (IOException e) { |
|
232 } |
|
233 datagramPool.returnPacket(packet); |
|
234 try { |
|
235 rtpSession.rtpSock.setSoTimeout(0); |
|
236 } catch (SocketException e2) { |
|
237 } |
|
238 while (!rtpSession.endSession) { |
|
239 if (RTPSession.rtpDebugLevel > 6) { |
|
240 if (rtpSession.mcSession) { |
|
241 System.out |
|
242 .println("-> RTPReceiverThread.run() waiting for MC packet on " |
|
243 + rtpSession.rtpMCSock.getLocalPort()); |
|
244 } else { |
|
245 System.out |
|
246 .println("-> RTPReceiverThread.run() waiting for packet on " |
|
247 + rtpSession.rtpSock.getLocalPort()); |
|
248 } |
|
249 } |
|
250 |
|
251 // Prepare a packet |
|
252 //DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length); |
|
253 packet = datagramPool.borrowPacket(); |
|
254 // Wait for it to arrive |
|
255 if (!rtpSession.mcSession) { |
|
256 // Unicast |
|
257 try { |
|
258 rtpSession.rtpSock.receive(packet); |
|
259 } catch (IOException e) { |
|
260 if (!rtpSession.endSession) { |
|
261 e.printStackTrace(); |
|
262 } else { |
|
263 continue; |
|
264 } |
|
265 } |
|
266 } else { |
|
267 // Multicast |
|
268 try { |
|
269 rtpSession.rtpMCSock.receive(packet); |
|
270 } catch (IOException e) { |
|
271 if (!rtpSession.endSession) { |
|
272 e.printStackTrace(); |
|
273 } else { |
|
274 continue; |
|
275 } |
|
276 } |
|
277 } |
|
278 // Parse the received RTP (?) packet |
|
279 RtpPkt pkt = RtpPktPool.getInstance().borrowPkt(); |
|
280 pkt.initPacket(packet.getData(), packet.getLength(), packet); |
|
281 |
|
282 // Check whether it was valid. |
|
283 if (pkt == null) { |
|
284 System.out.println("Received invalid RTP packet. Ignoring"); |
|
285 continue; |
|
286 } |
|
287 |
|
288 long pktSsrc = pkt.getSsrc(); |
|
289 |
|
290 // Check for loops and SSRC collisions |
|
291 if (rtpSession.ssrc == pktSsrc) |
|
292 rtpSession.resolveSsrcConflict(); |
|
293 |
|
294 long[] csrcArray = pkt.getCsrcArray(); |
|
295 if (csrcArray != null) { |
|
296 for (int i = 0; i < csrcArray.length; i++) { |
|
297 if (csrcArray[i] == rtpSession.ssrc) |
|
298 ; |
|
299 rtpSession.resolveSsrcConflict(); |
|
300 } |
|
301 } |
|
302 |
|
303 if (RTPSession.rtpDebugLevel > 17) { |
|
304 System.out |
|
305 .println("-> RTPReceiverThread.run() rcvd packet, seqNum " |
|
306 + pktSsrc); |
|
307 if (RTPSession.rtpDebugLevel > 10) { |
|
308 System.out.println("-> RTPReceiverThread.run() payload is " |
|
309 + pkt.getPayloadLength()); |
|
310 } |
|
311 } |
|
312 |
|
313 // Find the participant in the database based on SSRC |
|
314 Participant part = rtpSession.partDb.getParticipant(pktSsrc); |
|
315 |
|
316 if (part == null) { |
|
317 InetSocketAddress nullSocket = null; |
|
318 part = new Participant((InetSocketAddress) packet |
|
319 .getSocketAddress(), nullSocket, pkt.getSsrc()); |
|
320 part.unexpected = true; |
|
321 rtpSession.partDb.addParticipant(1, part); |
|
322 } |
|
323 |
|
324 // Do checks on whether the datagram came from the expected source |
|
325 // for that SSRC. |
|
326 if (part.rtpAddress == null |
|
327 || packet.getAddress().equals(part.rtpAddress.getAddress())) { |
|
328 PktBuffer pktBuffer = part.pktBuffer; |
|
329 |
|
330 if (pktBuffer != null) { |
|
331 // A buffer already exists, append to it |
|
332 pktBuffer.addPkt(pkt); |
|
333 } else { |
|
334 // Create a new packet/frame buffer |
|
335 pktBuffer = new PktBuffer(this.rtpSession, part, pkt); |
|
336 part.pktBuffer = pktBuffer; |
|
337 } |
|
338 } else { |
|
339 System.out |
|
340 .println("RTPReceiverThread: Got an unexpected packet from " |
|
341 + pkt.getSsrc() |
|
342 + " the sending ip-address was " |
|
343 + packet.getAddress().toString() |
|
344 + ", we expected from " |
|
345 + part.rtpAddress.toString()); |
|
346 } |
|
347 |
|
348 // Statistics for receiver report. |
|
349 part.updateRRStats(packet.getLength(), pkt); |
|
350 // Upate liveness |
|
351 part.lastRtpPkt = System.currentTimeMillis(); |
|
352 |
|
353 if (RTPSession.rtpDebugLevel > 5) { |
|
354 System.out |
|
355 .println("<-> RTPReceiverThread signalling pktBufDataReady"); |
|
356 } |
|
357 |
|
358 // Signal the thread that pushes data to application |
|
359 rtpSession.pktBufLock.lock(); |
|
360 try { |
|
361 rtpSession.pktBufDataReady.signalAll(); |
|
362 } finally { |
|
363 rtpSession.pktBufLock.unlock(); |
|
364 } |
|
365 |
|
366 } |
|
367 } |
|
368 |
|
369 } |
|