src/jlibrtp/RTPReceiverThread.java
changeset 823 2036ebfaccda
equal deleted inserted replaced
536:537ddd8aa407 823:2036ebfaccda
       
     1 /**
       
     2  * Java RTP Library (jlibrtp)
       
     3  * Copyright (C) 2006 Arne Kepp
       
     4  * 
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Lesser General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2.1 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Lesser General Public License for more details.
       
    14  * 
       
    15  * You should have received a copy of the GNU Lesser General Public
       
    16  * License along with this library; if not, write to the Free Software
       
    17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
       
    18  */
       
    19 package jlibrtp;
       
    20 
       
    21 import java.io.IOException;
       
    22 import java.net.DatagramPacket;
       
    23 import java.net.InetSocketAddress;
       
    24 import java.net.SocketException;
       
    25 
       
    26 import org.sipdroid.net.tools.DatagramPool;
       
    27 import org.sipdroid.net.tools.RtpPktPool;
       
    28 
       
    29 /**
       
    30  * The RTP receiver thread waits on the designated UDP socket for new packets.
       
    31  * 
       
    32  * Once one arrives, it is parsed and tested. We also check the ip-address of
       
    33  * the sender. If accepted, the packet is added onto the packet buffer of the
       
    34  * participant.
       
    35  * 
       
    36  * A separate thread moves the packet from the packet buffer to the application.
       
    37  * 
       
    38  * @author Arne Kepp
       
    39  */
       
    40 public class RTPReceiverThread extends Thread {
       
    41 	/** Parent RTP Session */
       
    42 	RTPSession rtpSession = null;
       
    43 	DatagramPool datagramPool = null;
       
    44 
       
    45 	RTPReceiverThread(RTPSession session) {
       
    46 		rtpSession = session;
       
    47 		datagramPool = DatagramPool.getInstance();
       
    48 		if (RTPSession.rtpDebugLevel > 1) {
       
    49 			System.out.println("<-> RTPReceiverThread created");
       
    50 		}
       
    51 	}
       
    52 	public void init() {
       
    53 		if (RTPSession.rtpDebugLevel > 1) {
       
    54 			if (rtpSession.mcSession) {
       
    55 				System.out.println("-> RTPReceiverThread.run() starting on MC "
       
    56 						+ rtpSession.rtpMCSock.getLocalPort());
       
    57 			} else {
       
    58 				System.out.println("-> RTPReceiverThread.run() starting on "
       
    59 						+ rtpSession.rtpSock.getLocalPort());
       
    60 			}
       
    61 		}
       
    62 
       
    63 		android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO);
       
    64 
       
    65 		DatagramPacket packet = datagramPool.borrowPacket();
       
    66 		try {
       
    67 			rtpSession.rtpSock.setSoTimeout(1);
       
    68 			for (;;)
       
    69 				rtpSession.rtpSock.receive(packet);
       
    70 		} catch (SocketException e2) {
       
    71 
       
    72 		} catch (IOException e) {
       
    73 		}	
       
    74 		datagramPool.returnPacket(packet);
       
    75 		try {
       
    76 			rtpSession.rtpSock.setSoTimeout(1000);
       
    77 		} catch (SocketException e2) {
       
    78 		}
       
    79 	}
       
    80 	
       
    81 	public void readPacketToBuffer() {
       
    82 		if (RTPSession.rtpDebugLevel > 6) {
       
    83 			if (rtpSession.mcSession) {
       
    84 				System.out
       
    85 				.println("-> RTPReceiverThread.run() waiting for MC packet on "
       
    86 						+ rtpSession.rtpMCSock.getLocalPort());
       
    87 			} else {
       
    88 				System.out
       
    89 				.println("-> RTPReceiverThread.run() waiting for packet on "
       
    90 						+ rtpSession.rtpSock.getLocalPort());
       
    91 			}
       
    92 		}
       
    93 
       
    94 		// Prepare a packet
       
    95 		//DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);
       
    96 		DatagramPacket packet = datagramPool.borrowPacket();
       
    97 		// Wait for it to arrive
       
    98 		if (!rtpSession.mcSession) {
       
    99 			// Unicast
       
   100 			try {
       
   101 				rtpSession.rtpSock.receive(packet);
       
   102 			} catch (IOException e) {
       
   103 				if (!rtpSession.endSession) {
       
   104 					e.printStackTrace();
       
   105 				} else {
       
   106 					return;
       
   107 				}
       
   108 			}
       
   109 		} else {
       
   110 			// Multicast
       
   111 			try {
       
   112 				rtpSession.rtpMCSock.receive(packet);
       
   113 			} catch (IOException e) {
       
   114 				if (!rtpSession.endSession) {
       
   115 					e.printStackTrace();
       
   116 				} else {
       
   117 					return;
       
   118 				}
       
   119 			}
       
   120 		}
       
   121 		// Parse the received RTP (?) packet
       
   122 		RtpPkt pkt = RtpPktPool.getInstance().borrowPkt();
       
   123 		pkt.initPacket(packet.getData(), packet.getLength(), packet);
       
   124 
       
   125 		// Check whether it was valid.
       
   126 		if (pkt == null) {
       
   127 			System.out.println("Received invalid RTP packet. Ignoring");
       
   128 			return;
       
   129 		}
       
   130 
       
   131 		long pktSsrc = pkt.getSsrc();
       
   132 
       
   133 		// Check for loops and SSRC collisions
       
   134 		if (rtpSession.ssrc == pktSsrc)
       
   135 			rtpSession.resolveSsrcConflict();
       
   136 
       
   137 		long[] csrcArray = pkt.getCsrcArray();
       
   138 		if (csrcArray != null) {
       
   139 			for (int i = 0; i < csrcArray.length; i++) {
       
   140 				if (csrcArray[i] == rtpSession.ssrc)
       
   141 					;
       
   142 				rtpSession.resolveSsrcConflict();
       
   143 			}
       
   144 		}
       
   145 
       
   146 		if (RTPSession.rtpDebugLevel > 17) {
       
   147 			System.out
       
   148 			.println("-> RTPReceiverThread.run() rcvd packet, seqNum "
       
   149 					+ pktSsrc);
       
   150 			if (RTPSession.rtpDebugLevel > 10) {
       
   151 				System.out.println("-> RTPReceiverThread.run() payload is "
       
   152 						+ pkt.getPayloadLength());
       
   153 			}
       
   154 		}
       
   155 
       
   156 		// Find the participant in the database based on SSRC
       
   157 		//Participant part = rtpSession.partDb.getParticipant(pktSsrc);
       
   158 		Participant part = rtpSession.firstPart;
       
   159 		if (part == null) {
       
   160 			InetSocketAddress nullSocket = null;
       
   161 			part = new Participant((InetSocketAddress) packet
       
   162 					.getSocketAddress(), nullSocket, pkt.getSsrc());
       
   163 			part.unexpected = true;
       
   164 			rtpSession.partDb.addParticipant(1, part);
       
   165 		}
       
   166 
       
   167 		// Do checks on whether the datagram came from the expected source
       
   168 		// for that SSRC.
       
   169 		
       
   170 		if (part.rtpAddress == null
       
   171 				|| packet.getAddress().equals(part.rtpAddress.getAddress())) {
       
   172 			PktBuffer pktBuffer = part.pktBuffer;
       
   173 
       
   174 			if (pktBuffer != null) {
       
   175 				// A buffer already exists, append to it
       
   176 				pktBuffer.addPkt(pkt);
       
   177 			} else {
       
   178 				// Create a new packet/frame buffer
       
   179 				pktBuffer = new PktBuffer(this.rtpSession, part, pkt);
       
   180 				part.pktBuffer = pktBuffer;
       
   181 			}
       
   182 		} else {
       
   183 			System.out
       
   184 			.println("RTPReceiverThread: Got an unexpected packet from "
       
   185 					+ pkt.getSsrc()
       
   186 					+ " the sending ip-address was "
       
   187 					+ packet.getAddress().toString()
       
   188 					+ ", we expected from "
       
   189 					+ part.rtpAddress.toString());
       
   190 		}
       
   191 
       
   192 		// Statistics for receiver report.
       
   193 		part.updateRRStats(packet.getLength(), pkt);
       
   194 		// Upate liveness
       
   195 		part.lastRtpPkt = System.currentTimeMillis();
       
   196 
       
   197 		if (RTPSession.rtpDebugLevel > 5) {
       
   198 			System.out
       
   199 			.println("<-> RTPReceiverThread signalling pktBufDataReady");
       
   200 		}
       
   201 
       
   202 		// Signal the thread that pushes data to application
       
   203 		/*rtpSession.pktBufLock.lock();
       
   204 		try {
       
   205 			rtpSession.pktBufDataReady.signalAll();
       
   206 		} finally {
       
   207 			rtpSession.pktBufLock.unlock();
       
   208 		}*/
       
   209 	}
       
   210 
       
   211 	public void run() {
       
   212 		if (RTPSession.rtpDebugLevel > 1) {
       
   213 			if (rtpSession.mcSession) {
       
   214 				System.out.println("-> RTPReceiverThread.run() starting on MC "
       
   215 						+ rtpSession.rtpMCSock.getLocalPort());
       
   216 			} else {
       
   217 				System.out.println("-> RTPReceiverThread.run() starting on "
       
   218 						+ rtpSession.rtpSock.getLocalPort());
       
   219 			}
       
   220 		}
       
   221 
       
   222 		android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO);
       
   223 
       
   224 		DatagramPacket packet = datagramPool.borrowPacket();
       
   225 		try {
       
   226 			rtpSession.rtpSock.setSoTimeout(1);
       
   227 			for (;;)
       
   228 				rtpSession.rtpSock.receive(packet);
       
   229 		} catch (SocketException e2) {
       
   230 
       
   231 		} catch (IOException e) {
       
   232 		}	
       
   233 		datagramPool.returnPacket(packet);
       
   234 		try {
       
   235 			rtpSession.rtpSock.setSoTimeout(0);
       
   236 		} catch (SocketException e2) {
       
   237 		}
       
   238 		while (!rtpSession.endSession) {
       
   239 			if (RTPSession.rtpDebugLevel > 6) {
       
   240 				if (rtpSession.mcSession) {
       
   241 					System.out
       
   242 					.println("-> RTPReceiverThread.run() waiting for MC packet on "
       
   243 							+ rtpSession.rtpMCSock.getLocalPort());
       
   244 				} else {
       
   245 					System.out
       
   246 					.println("-> RTPReceiverThread.run() waiting for packet on "
       
   247 							+ rtpSession.rtpSock.getLocalPort());
       
   248 				}
       
   249 			}
       
   250 
       
   251 			// Prepare a packet
       
   252 			//DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);
       
   253 			packet = datagramPool.borrowPacket();
       
   254 			// Wait for it to arrive
       
   255 			if (!rtpSession.mcSession) {
       
   256 				// Unicast
       
   257 				try {
       
   258 					rtpSession.rtpSock.receive(packet);
       
   259 				} catch (IOException e) {
       
   260 					if (!rtpSession.endSession) {
       
   261 						e.printStackTrace();
       
   262 					} else {
       
   263 						continue;
       
   264 					}
       
   265 				}
       
   266 			} else {
       
   267 				// Multicast
       
   268 				try {
       
   269 					rtpSession.rtpMCSock.receive(packet);
       
   270 				} catch (IOException e) {
       
   271 					if (!rtpSession.endSession) {
       
   272 						e.printStackTrace();
       
   273 					} else {
       
   274 						continue;
       
   275 					}
       
   276 				}
       
   277 			}
       
   278 			// Parse the received RTP (?) packet
       
   279 			RtpPkt pkt = RtpPktPool.getInstance().borrowPkt();
       
   280 			pkt.initPacket(packet.getData(), packet.getLength(), packet);
       
   281 
       
   282 			// Check whether it was valid.
       
   283 			if (pkt == null) {
       
   284 				System.out.println("Received invalid RTP packet. Ignoring");
       
   285 				continue;
       
   286 			}
       
   287 
       
   288 			long pktSsrc = pkt.getSsrc();
       
   289 
       
   290 			// Check for loops and SSRC collisions
       
   291 			if (rtpSession.ssrc == pktSsrc)
       
   292 				rtpSession.resolveSsrcConflict();
       
   293 
       
   294 			long[] csrcArray = pkt.getCsrcArray();
       
   295 			if (csrcArray != null) {
       
   296 				for (int i = 0; i < csrcArray.length; i++) {
       
   297 					if (csrcArray[i] == rtpSession.ssrc)
       
   298 						;
       
   299 					rtpSession.resolveSsrcConflict();
       
   300 				}
       
   301 			}
       
   302 
       
   303 			if (RTPSession.rtpDebugLevel > 17) {
       
   304 				System.out
       
   305 				.println("-> RTPReceiverThread.run() rcvd packet, seqNum "
       
   306 						+ pktSsrc);
       
   307 				if (RTPSession.rtpDebugLevel > 10) {
       
   308 					System.out.println("-> RTPReceiverThread.run() payload is "
       
   309 							+ pkt.getPayloadLength());
       
   310 				}
       
   311 			}
       
   312 
       
   313 			// Find the participant in the database based on SSRC
       
   314 			Participant part = rtpSession.partDb.getParticipant(pktSsrc);
       
   315 
       
   316 			if (part == null) {
       
   317 				InetSocketAddress nullSocket = null;
       
   318 				part = new Participant((InetSocketAddress) packet
       
   319 						.getSocketAddress(), nullSocket, pkt.getSsrc());
       
   320 				part.unexpected = true;
       
   321 				rtpSession.partDb.addParticipant(1, part);
       
   322 			}
       
   323 
       
   324 			// Do checks on whether the datagram came from the expected source
       
   325 			// for that SSRC.
       
   326 			if (part.rtpAddress == null
       
   327 					|| packet.getAddress().equals(part.rtpAddress.getAddress())) {
       
   328 				PktBuffer pktBuffer = part.pktBuffer;
       
   329 
       
   330 				if (pktBuffer != null) {
       
   331 					// A buffer already exists, append to it
       
   332 					pktBuffer.addPkt(pkt);
       
   333 				} else {
       
   334 					// Create a new packet/frame buffer
       
   335 					pktBuffer = new PktBuffer(this.rtpSession, part, pkt);
       
   336 					part.pktBuffer = pktBuffer;
       
   337 				}
       
   338 			} else {
       
   339 				System.out
       
   340 				.println("RTPReceiverThread: Got an unexpected packet from "
       
   341 						+ pkt.getSsrc()
       
   342 						+ " the sending ip-address was "
       
   343 						+ packet.getAddress().toString()
       
   344 						+ ", we expected from "
       
   345 						+ part.rtpAddress.toString());
       
   346 			}
       
   347 
       
   348 			// Statistics for receiver report.
       
   349 			part.updateRRStats(packet.getLength(), pkt);
       
   350 			// Upate liveness
       
   351 			part.lastRtpPkt = System.currentTimeMillis();
       
   352 
       
   353 			if (RTPSession.rtpDebugLevel > 5) {
       
   354 				System.out
       
   355 				.println("<-> RTPReceiverThread signalling pktBufDataReady");
       
   356 			}
       
   357 
       
   358 			// Signal the thread that pushes data to application
       
   359 			rtpSession.pktBufLock.lock();
       
   360 			try {
       
   361 				rtpSession.pktBufDataReady.signalAll();
       
   362 			} finally {
       
   363 				rtpSession.pktBufLock.unlock();
       
   364 			}
       
   365 
       
   366 		}
       
   367 	}
       
   368 
       
   369 }