src/jlibrtp/RTPReceiverThread.java
changeset 13 e684f11070d5
equal deleted inserted replaced
12:c9ff263c29ad 13:e684f11070d5
       
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.io.IOException;
       
    22 import java.net.DatagramPacket;
       
    23 import java.net.InetSocketAddress;
       
    24 
       
    25 /**
       
    26  * The RTP receiver thread waits on the designated UDP socket for new packets.
       
    27  * 
       
    28  * Once one arrives, it is parsed and tested. We also check the ip-address of the sender. 
       
    29  * If accepted, the packet is added onto the packet buffer of the participant.
       
    30  * 
       
    31  * A separate thread moves the packet from the packet buffer to the application.
       
    32  * 
       
    33  * @author Arne Kepp
       
    34  */
       
    35 public class RTPReceiverThread extends Thread {
       
    36 	/** Parent RTP Session */
       
    37 	RTPSession rtpSession = null;
       
    38 
       
    39 	RTPReceiverThread(RTPSession session) {
       
    40 		rtpSession = session;
       
    41 		if(RTPSession.rtpDebugLevel > 1) {
       
    42 			System.out.println("<-> RTPReceiverThread created");
       
    43 		} 
       
    44 	}
       
    45 
       
    46 	public void run() {
       
    47 		if(RTPSession.rtpDebugLevel > 1) {
       
    48 			if(rtpSession.mcSession) {
       
    49 				System.out.println("-> RTPReceiverThread.run() starting on MC " + rtpSession.rtpMCSock.getLocalPort() );
       
    50 			} else {
       
    51 				System.out.println("-> RTPReceiverThread.run() starting on " + rtpSession.rtpSock.getLocalPort() );
       
    52 			}
       
    53 		}
       
    54 
       
    55 		while(!rtpSession.endSession) {
       
    56 			if(RTPSession.rtpDebugLevel > 6) {
       
    57 				if(rtpSession.mcSession) {
       
    58 					System.out.println("-> RTPReceiverThread.run() waiting for MC packet on " + rtpSession.rtpMCSock.getLocalPort() );
       
    59 				} else {
       
    60 					System.out.println("-> RTPReceiverThread.run() waiting for packet on " + rtpSession.rtpSock.getLocalPort() );
       
    61 				}
       
    62 			}
       
    63 
       
    64 			// Prepare a packet
       
    65 			byte[] rawPkt = new byte[1500];
       
    66 			DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);
       
    67 			// Wait for it to arrive
       
    68 			if(! rtpSession.mcSession) {
       
    69 				//Unicast
       
    70 				try {
       
    71 					rtpSession.rtpSock.receive(packet);
       
    72 				} catch (IOException e) {
       
    73 					if(!rtpSession.endSession) {
       
    74 						e.printStackTrace();
       
    75 					} else {
       
    76 						continue;
       
    77 					}
       
    78 				}
       
    79 			} else {
       
    80 				//Multicast 
       
    81 				try {
       
    82 					rtpSession.rtpMCSock.receive(packet);
       
    83 				} catch (IOException e) {
       
    84 					if(!rtpSession.endSession) {
       
    85 						e.printStackTrace();
       
    86 					} else {
       
    87 						continue;
       
    88 					}
       
    89 				}
       
    90 			}
       
    91 
       
    92 			// Parse the received RTP (?) packet
       
    93 			RtpPkt pkt = new RtpPkt(rawPkt, packet.getLength());
       
    94 
       
    95 			// Check whether it was valid.
       
    96 			if(pkt == null) {
       
    97 				System.out.println("Received invalid RTP packet. Ignoring");
       
    98 				continue;
       
    99 			}
       
   100 			
       
   101 			long pktSsrc = pkt.getSsrc();
       
   102 			
       
   103 			// Check for loops and SSRC collisions
       
   104 			if( rtpSession.ssrc == pktSsrc )
       
   105 				rtpSession.resolveSsrcConflict();
       
   106 			
       
   107 			long[] csrcArray = pkt.getCsrcArray();
       
   108 			if( csrcArray != null) {
       
   109 				for(int i=0; i< csrcArray.length; i++) {
       
   110 					if(csrcArray[i] == rtpSession.ssrc);
       
   111 						rtpSession.resolveSsrcConflict();
       
   112 				}
       
   113 			}
       
   114 			
       
   115 			if(RTPSession.rtpDebugLevel > 17) {
       
   116 				System.out.println("-> RTPReceiverThread.run() rcvd packet, seqNum " + pktSsrc );
       
   117 				if(RTPSession.rtpDebugLevel > 10) {
       
   118 					String str = new String(pkt.getPayload());
       
   119 					System.out.println("-> RTPReceiverThread.run() payload is " + str );
       
   120 				}
       
   121 			}
       
   122 			
       
   123 			//Find the participant in the database based on SSRC
       
   124 			Participant part = rtpSession.partDb.getParticipant(pktSsrc);
       
   125 
       
   126 			if(part == null) {
       
   127 				InetSocketAddress nullSocket = null;
       
   128 				part = new Participant((InetSocketAddress) packet.getSocketAddress(), nullSocket, pkt.getSsrc());
       
   129 				part.unexpected = true;
       
   130 				rtpSession.partDb.addParticipant(1,part);
       
   131 			}
       
   132 
       
   133 			// Do checks on whether the datagram came from the expected source for that SSRC.
       
   134 			if(part.rtpAddress == null || packet.getAddress().equals(part.rtpAddress.getAddress())) {
       
   135 				PktBuffer pktBuffer = part.pktBuffer;
       
   136 
       
   137 				if(pktBuffer != null) {
       
   138 					//A buffer already exists, append to it
       
   139 					pktBuffer.addPkt(pkt);
       
   140 				} else {
       
   141 					// Create a new packet/frame buffer
       
   142 					pktBuffer = new PktBuffer(this.rtpSession, part,pkt);
       
   143 					part.pktBuffer = pktBuffer;
       
   144 				}
       
   145 			} else {
       
   146 				System.out.println("RTPReceiverThread: Got an unexpected packet from " + pkt.getSsrc() 
       
   147 						+ " the sending ip-address was " + packet.getAddress().toString() 
       
   148 						+ ", we expected from " + part.rtpAddress.toString());
       
   149 			}
       
   150 
       
   151 			// Statistics for receiver report.
       
   152 			part.updateRRStats(packet.getLength(), pkt);
       
   153 			// Upate liveness
       
   154 			part.lastRtpPkt = System.currentTimeMillis();
       
   155 
       
   156 			if(RTPSession.rtpDebugLevel > 5) {
       
   157 				System.out.println("<-> RTPReceiverThread signalling pktBufDataReady");
       
   158 			}
       
   159 			
       
   160 			// Signal the thread that pushes data to application
       
   161 			rtpSession.pktBufLock.lock();
       
   162 			try { rtpSession.pktBufDataReady.signalAll(); } finally {
       
   163 				rtpSession.pktBufLock.unlock();
       
   164 			}
       
   165 
       
   166 		}
       
   167 	}
       
   168 
       
   169 }