diff -r 537ddd8aa407 -r 2036ebfaccda src/jlibrtp/RTPReceiverThread.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jlibrtp/RTPReceiverThread.java Fri Nov 20 19:29:42 2009 +0100 @@ -0,0 +1,369 @@ +/** + * Java RTP Library (jlibrtp) + * Copyright (C) 2006 Arne Kepp + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ +package jlibrtp; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketException; + +import org.sipdroid.net.tools.DatagramPool; +import org.sipdroid.net.tools.RtpPktPool; + +/** + * The RTP receiver thread waits on the designated UDP socket for new packets. + * + * Once one arrives, it is parsed and tested. We also check the ip-address of + * the sender. If accepted, the packet is added onto the packet buffer of the + * participant. + * + * A separate thread moves the packet from the packet buffer to the application. + * + * @author Arne Kepp + */ +public class RTPReceiverThread extends Thread { + /** Parent RTP Session */ + RTPSession rtpSession = null; + DatagramPool datagramPool = null; + + RTPReceiverThread(RTPSession session) { + rtpSession = session; + datagramPool = DatagramPool.getInstance(); + if (RTPSession.rtpDebugLevel > 1) { + System.out.println("<-> RTPReceiverThread created"); + } + } + public void init() { + if (RTPSession.rtpDebugLevel > 1) { + if (rtpSession.mcSession) { + System.out.println("-> RTPReceiverThread.run() starting on MC " + + rtpSession.rtpMCSock.getLocalPort()); + } else { + System.out.println("-> RTPReceiverThread.run() starting on " + + rtpSession.rtpSock.getLocalPort()); + } + } + + android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO); + + DatagramPacket packet = datagramPool.borrowPacket(); + try { + rtpSession.rtpSock.setSoTimeout(1); + for (;;) + rtpSession.rtpSock.receive(packet); + } catch (SocketException e2) { + + } catch (IOException e) { + } + datagramPool.returnPacket(packet); + try { + rtpSession.rtpSock.setSoTimeout(1000); + } catch (SocketException e2) { + } + } + + public void readPacketToBuffer() { + if (RTPSession.rtpDebugLevel > 6) { + if (rtpSession.mcSession) { + System.out + .println("-> RTPReceiverThread.run() waiting for MC packet on " + + rtpSession.rtpMCSock.getLocalPort()); + } else { + System.out + .println("-> RTPReceiverThread.run() waiting for packet on " + + rtpSession.rtpSock.getLocalPort()); + } + } + + // Prepare a packet + //DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length); + DatagramPacket packet = datagramPool.borrowPacket(); + // Wait for it to arrive + if (!rtpSession.mcSession) { + // Unicast + try { + rtpSession.rtpSock.receive(packet); + } catch (IOException e) { + if (!rtpSession.endSession) { + e.printStackTrace(); + } else { + return; + } + } + } else { + // Multicast + try { + rtpSession.rtpMCSock.receive(packet); + } catch (IOException e) { + if (!rtpSession.endSession) { + e.printStackTrace(); + } else { + return; + } + } + } + // Parse the received RTP (?) packet + RtpPkt pkt = RtpPktPool.getInstance().borrowPkt(); + pkt.initPacket(packet.getData(), packet.getLength(), packet); + + // Check whether it was valid. + if (pkt == null) { + System.out.println("Received invalid RTP packet. Ignoring"); + return; + } + + long pktSsrc = pkt.getSsrc(); + + // Check for loops and SSRC collisions + if (rtpSession.ssrc == pktSsrc) + rtpSession.resolveSsrcConflict(); + + long[] csrcArray = pkt.getCsrcArray(); + if (csrcArray != null) { + for (int i = 0; i < csrcArray.length; i++) { + if (csrcArray[i] == rtpSession.ssrc) + ; + rtpSession.resolveSsrcConflict(); + } + } + + if (RTPSession.rtpDebugLevel > 17) { + System.out + .println("-> RTPReceiverThread.run() rcvd packet, seqNum " + + pktSsrc); + if (RTPSession.rtpDebugLevel > 10) { + System.out.println("-> RTPReceiverThread.run() payload is " + + pkt.getPayloadLength()); + } + } + + // Find the participant in the database based on SSRC + //Participant part = rtpSession.partDb.getParticipant(pktSsrc); + Participant part = rtpSession.firstPart; + if (part == null) { + InetSocketAddress nullSocket = null; + part = new Participant((InetSocketAddress) packet + .getSocketAddress(), nullSocket, pkt.getSsrc()); + part.unexpected = true; + rtpSession.partDb.addParticipant(1, part); + } + + // Do checks on whether the datagram came from the expected source + // for that SSRC. + + if (part.rtpAddress == null + || packet.getAddress().equals(part.rtpAddress.getAddress())) { + PktBuffer pktBuffer = part.pktBuffer; + + if (pktBuffer != null) { + // A buffer already exists, append to it + pktBuffer.addPkt(pkt); + } else { + // Create a new packet/frame buffer + pktBuffer = new PktBuffer(this.rtpSession, part, pkt); + part.pktBuffer = pktBuffer; + } + } else { + System.out + .println("RTPReceiverThread: Got an unexpected packet from " + + pkt.getSsrc() + + " the sending ip-address was " + + packet.getAddress().toString() + + ", we expected from " + + part.rtpAddress.toString()); + } + + // Statistics for receiver report. + part.updateRRStats(packet.getLength(), pkt); + // Upate liveness + part.lastRtpPkt = System.currentTimeMillis(); + + if (RTPSession.rtpDebugLevel > 5) { + System.out + .println("<-> RTPReceiverThread signalling pktBufDataReady"); + } + + // Signal the thread that pushes data to application + /*rtpSession.pktBufLock.lock(); + try { + rtpSession.pktBufDataReady.signalAll(); + } finally { + rtpSession.pktBufLock.unlock(); + }*/ + } + + public void run() { + if (RTPSession.rtpDebugLevel > 1) { + if (rtpSession.mcSession) { + System.out.println("-> RTPReceiverThread.run() starting on MC " + + rtpSession.rtpMCSock.getLocalPort()); + } else { + System.out.println("-> RTPReceiverThread.run() starting on " + + rtpSession.rtpSock.getLocalPort()); + } + } + + android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO); + + DatagramPacket packet = datagramPool.borrowPacket(); + try { + rtpSession.rtpSock.setSoTimeout(1); + for (;;) + rtpSession.rtpSock.receive(packet); + } catch (SocketException e2) { + + } catch (IOException e) { + } + datagramPool.returnPacket(packet); + try { + rtpSession.rtpSock.setSoTimeout(0); + } catch (SocketException e2) { + } + while (!rtpSession.endSession) { + if (RTPSession.rtpDebugLevel > 6) { + if (rtpSession.mcSession) { + System.out + .println("-> RTPReceiverThread.run() waiting for MC packet on " + + rtpSession.rtpMCSock.getLocalPort()); + } else { + System.out + .println("-> RTPReceiverThread.run() waiting for packet on " + + rtpSession.rtpSock.getLocalPort()); + } + } + + // Prepare a packet + //DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length); + packet = datagramPool.borrowPacket(); + // Wait for it to arrive + if (!rtpSession.mcSession) { + // Unicast + try { + rtpSession.rtpSock.receive(packet); + } catch (IOException e) { + if (!rtpSession.endSession) { + e.printStackTrace(); + } else { + continue; + } + } + } else { + // Multicast + try { + rtpSession.rtpMCSock.receive(packet); + } catch (IOException e) { + if (!rtpSession.endSession) { + e.printStackTrace(); + } else { + continue; + } + } + } + // Parse the received RTP (?) packet + RtpPkt pkt = RtpPktPool.getInstance().borrowPkt(); + pkt.initPacket(packet.getData(), packet.getLength(), packet); + + // Check whether it was valid. + if (pkt == null) { + System.out.println("Received invalid RTP packet. Ignoring"); + continue; + } + + long pktSsrc = pkt.getSsrc(); + + // Check for loops and SSRC collisions + if (rtpSession.ssrc == pktSsrc) + rtpSession.resolveSsrcConflict(); + + long[] csrcArray = pkt.getCsrcArray(); + if (csrcArray != null) { + for (int i = 0; i < csrcArray.length; i++) { + if (csrcArray[i] == rtpSession.ssrc) + ; + rtpSession.resolveSsrcConflict(); + } + } + + if (RTPSession.rtpDebugLevel > 17) { + System.out + .println("-> RTPReceiverThread.run() rcvd packet, seqNum " + + pktSsrc); + if (RTPSession.rtpDebugLevel > 10) { + System.out.println("-> RTPReceiverThread.run() payload is " + + pkt.getPayloadLength()); + } + } + + // Find the participant in the database based on SSRC + Participant part = rtpSession.partDb.getParticipant(pktSsrc); + + if (part == null) { + InetSocketAddress nullSocket = null; + part = new Participant((InetSocketAddress) packet + .getSocketAddress(), nullSocket, pkt.getSsrc()); + part.unexpected = true; + rtpSession.partDb.addParticipant(1, part); + } + + // Do checks on whether the datagram came from the expected source + // for that SSRC. + if (part.rtpAddress == null + || packet.getAddress().equals(part.rtpAddress.getAddress())) { + PktBuffer pktBuffer = part.pktBuffer; + + if (pktBuffer != null) { + // A buffer already exists, append to it + pktBuffer.addPkt(pkt); + } else { + // Create a new packet/frame buffer + pktBuffer = new PktBuffer(this.rtpSession, part, pkt); + part.pktBuffer = pktBuffer; + } + } else { + System.out + .println("RTPReceiverThread: Got an unexpected packet from " + + pkt.getSsrc() + + " the sending ip-address was " + + packet.getAddress().toString() + + ", we expected from " + + part.rtpAddress.toString()); + } + + // Statistics for receiver report. + part.updateRRStats(packet.getLength(), pkt); + // Upate liveness + part.lastRtpPkt = System.currentTimeMillis(); + + if (RTPSession.rtpDebugLevel > 5) { + System.out + .println("<-> RTPReceiverThread signalling pktBufDataReady"); + } + + // Signal the thread that pushes data to application + rtpSession.pktBufLock.lock(); + try { + rtpSession.pktBufDataReady.signalAll(); + } finally { + rtpSession.pktBufLock.unlock(); + } + + } + } + +}