/* * ============================================================================ * The Apache Software License, Version 1.1 * ============================================================================ * * Copyright (C) 2002 The Apache Software Foundation. All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- * tion, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by SuperBonBon Industries (http://www.sbbi.net/)." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "UPNPLib" and "SuperBonBon Industries" must not be * used to endorse or promote products derived from this software without * prior written permission. For written permission, please contact * info@sbbi.net. * * 5. Products derived from this software may not be called * "SuperBonBon Industries", nor may "SBBI" appear in their name, * without prior written permission of SuperBonBon Industries. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT,INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- * DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This software consists of voluntary contributions made by many individuals * on behalf of SuperBonBon Industries. For more information on * SuperBonBon Industries, please see . */ package net.sbbi.upnp; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.StringReader; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.URL; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import net.sbbi.upnp.services.UPNPService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.xml.sax.InputSource; /** * This class can be used with the ServiceEventHandler interface * to recieve notifications about state variables changes on * a given UPNP service. * @author SuperBonBon * @version 1.0 */ @SuppressWarnings("unchecked") public class ServicesEventing implements Runnable { final static Log log = LogFactory.getLog( ServicesEventing.class ); private final static ServicesEventing singleton = new ServicesEventing(); private boolean inService = false; private boolean daemon = true; private int daemonPort = 9999; private ServerSocket server = null; private List registered = new ArrayList(); private ServicesEventing() { } public final static ServicesEventing getInstance() { return singleton; } /** * Set the listeniner thread as a daemon, default to true. * Only works when no more objects are registered. * @param daemon the new thread type. */ public void setDaemon( boolean daemon ) { this.daemon = daemon; } /** * Sets the listener thread port, default to 9999. * Only works when no more objects are registered. * @param daemonPort the new listening port */ public void setDaemonPort( int daemonPort ) { this.daemonPort = daemonPort; } /** * Register state variable events notification for a device service * @param service the service to register with * @param handler the registrant object * @param subscriptionDuration subscription time in seconds, -1 for infinite time * @return the subscription duration returned by the device, 0 for an infinite duration or -1 if no subscription done * @throws IOException if some IOException error happens during coms with the device */ public int register( UPNPService service, ServiceEventHandler handler, int subscriptionDuration ) throws IOException { ServiceEventSubscription sub = registerEvent( service, handler, subscriptionDuration ); if ( sub != null ) { return sub.getDurationTime(); } return -1; } /** * Register state variable events notification for a device service * @param service the service to register with * @param handler the registrant object * @param subscriptionDuration subscription time in seconds, -1 for infinite time * @return an ServiceEventSubscription object instance containing all the required info or null if no subscription done * @throws IOException if some IOException error happens during coms with the device */ public ServiceEventSubscription registerEvent( UPNPService service, ServiceEventHandler handler, int subscriptionDuration ) throws IOException { URL eventingLoc = service.getEventSubURL(); if ( eventingLoc != null ) { if ( !inService ) startServicesEventingThread(); String duration = Integer.toString( subscriptionDuration ); if ( subscriptionDuration == -1 ) { duration = "infinite"; } Subscription sub = lookupSubscriber( service, handler ); if ( sub != null ) { // allready registered let's try to unregister it unRegister( service, handler ); } StringBuffer packet = new StringBuffer( 64 ); packet.append( "SUBSCRIBE " ).append( eventingLoc.getFile() ).append( " HTTP/1.1\r\n" ); packet.append( "HOST: " ).append( eventingLoc.getHost() ).append( ":" ).append( eventingLoc.getPort() ).append( "\r\n" ); packet.append( "CALLBACK: \r\n" ); packet.append( "NT: upnp:event\r\n" ); packet.append( "Connection: close\r\n" ); packet.append( "TIMEOUT: Second-" ).append( duration ).append( "\r\n\r\n" ); Socket skt = new Socket( eventingLoc.getHost(), eventingLoc.getPort() ); skt.setSoTimeout( 30000 ); // 30 secs timeout according to the specs if ( log.isDebugEnabled() ) log.debug( packet ); OutputStream out = skt.getOutputStream(); out.write( packet.toString().getBytes() ); out.flush(); InputStream in = skt.getInputStream(); StringBuffer data = new StringBuffer(); int readen = 0; byte[] buffer = new byte[256]; while ( ( readen = in.read( buffer ) ) != -1 ) { data.append( new String( buffer, 0, readen ) ); } in.close(); out.close(); skt.close(); if ( log.isDebugEnabled() ) log.debug( data.toString() ); if ( data.toString().trim().length() > 0 ) { HttpResponse resp = new HttpResponse( data.toString() ); if ( resp.getHeader().startsWith( "HTTP/1.1 200 OK" ) ) { String sid = resp.getHTTPHeaderField( "SID" ); String actualTimeout = resp.getHTTPHeaderField( "TIMEOUT" ); int durationTime = 0; // actualTimeout = Second-xxx or Second-infinite if ( !actualTimeout.equalsIgnoreCase( "Second-infinite" ) ) { durationTime = Integer.parseInt( actualTimeout.substring( 7 ) ); } sub = new Subscription(); sub.handler = handler; sub.sub = new ServiceEventSubscription( service.getServiceType(), service.getServiceId(), service.getEventSubURL(), sid, skt.getInetAddress(), durationTime ); synchronized( registered ) { registered.add( sub ); } return sub.sub; } } } return null; } private Subscription lookupSubscriber( UPNPService service, ServiceEventHandler handler ) { synchronized( registered ) { for ( Iterator i = registered.iterator(); i.hasNext(); ) { Subscription sub = (Subscription)i.next(); if ( sub.handler == handler && sub.sub.getServiceId().hashCode() == service.getServiceId().hashCode() && sub.sub.getServiceType().hashCode() == service.getServiceType().hashCode() && sub.sub.getServiceURL().equals( service.getEventSubURL() ) ) { return sub; } } } return null; } Subscription lookupSubscriber( String sid, InetAddress deviceIp ) { synchronized( registered ) { for ( Iterator i = registered.iterator(); i.hasNext(); ) { Subscription sub = (Subscription)i.next(); if ( sub.sub.getSID().equals( sid ) && sub.sub.getDeviceIp().equals( deviceIp ) ) { return sub; } } } return null; } Subscription lookupSubscriber( String sid ) { synchronized( registered ) { for ( Iterator i = registered.iterator(); i.hasNext(); ) { Subscription sub = (Subscription)i.next(); if ( sub.sub.getSID().equals( sid ) ) { return sub; } } } return null; } /** * Unregisters events notifications from a service * @param service the service that need to be unregistered * @param handler the handler that registered for this service * @return true if unregistered false otherwise ( the given handler never registred for the given service ) * @throws IOException if some IOException error happens during coms with the device */ public boolean unRegister( UPNPService service, ServiceEventHandler handler ) throws IOException { URL eventingLoc = service.getEventSubURL(); if ( eventingLoc != null ) { Subscription sub = lookupSubscriber( service, handler ); if ( sub != null ) { synchronized( registered ) { registered.remove( sub ); } if ( registered.size() == 0 ) { stopServicesEventingThread(); } StringBuffer packet = new StringBuffer( 64 ); packet.append( "UNSUBSCRIBE " ).append( eventingLoc.getFile() ).append( " HTTP/1.1\r\n" ); packet.append( "HOST: " ).append( eventingLoc.getHost() ).append( ":" ).append( eventingLoc.getPort() ).append( "\r\n" ); packet.append( "SID: " ).append( sub.sub.getSID() ).append( "\r\n\r\n" ); Socket skt = new Socket( eventingLoc.getHost(), eventingLoc.getPort() ); skt.setSoTimeout( 30000 ); // 30 secs timeout according to the specs if ( log.isDebugEnabled() ) log.debug( packet ); OutputStream out = skt.getOutputStream(); out.write( packet.toString().getBytes() ); out.flush(); InputStream in = skt.getInputStream(); StringBuffer data = new StringBuffer(); int readen = 0; byte[] buffer = new byte[256]; while ( ( readen = in.read( buffer ) ) != -1 ) { data.append( new String( buffer, 0, readen ) ); } in.close(); out.close(); skt.close(); if ( log.isDebugEnabled() ) log.debug( data.toString() ); if ( data.toString().trim().length() > 0 ) { HttpResponse resp = new HttpResponse( data.toString() ); if ( resp.getHeader().startsWith( "HTTP/1.1 200 OK" ) ) { return true; } } } } return false; } private void startServicesEventingThread() { synchronized( singleton ) { if ( !inService ) { Thread deamon = new Thread( singleton, "ServicesEventing daemon" ); deamon.setDaemon( daemon ); inService = true; deamon.start(); } } } private void stopServicesEventingThread() { synchronized( singleton ) { inService = false; try { server.close(); } catch ( IOException ex ) { // should not happen } } } public void run() { // only the deamon thread is allowed to call such method if ( !Thread.currentThread().getName().equals( "ServicesEventing daemon" ) ) return; try { server = new ServerSocket( daemonPort ); } catch ( IOException ex ) { log.error( "Error during daemon server socket on port " + daemonPort + " creation", ex ); return; } while ( inService ) { try { Socket skt = server.accept(); new Thread( new RequestProcessor( skt ) ).start(); } catch ( IOException ioEx ) { if ( inService ) { log.error( "IO Exception during UPNP messages listening thread", ioEx ); } } } } class Subscription { ServiceEventSubscription sub = null; ServiceEventHandler handler = null; } class RequestProcessor implements Runnable { private Socket client; RequestProcessor( Socket client ) { this.client = client; } public void run() { try { client.setSoTimeout( 30000 ); InputStream in = client.getInputStream(); OutputStream out = client.getOutputStream(); int readen = 0; StringBuffer data = new StringBuffer(); byte[] buffer = new byte[256]; boolean EOF = false; while ( !EOF && ( readen = in.read( buffer ) ) != -1 ) { data.append( new String( buffer, 0, readen ) ); // avoid a strange behaviour with some impls.. the -1 is never reached and a sockettimeout occurs // and a 0 byte is sent as the last byte if ( data.charAt( data.length()-1 ) == (char)0 ) { EOF = true; } } String packet = data.toString(); if ( packet.trim().length() > 0 ) { if ( packet.indexOf( (char)0 ) != -1 ) packet = packet.replace( (char)0, ' ' ); HttpResponse resp = new HttpResponse( packet ); if ( resp.getHeader().startsWith( "NOTIFY" ) ) { String sid = resp.getHTTPHeaderField( "SID" ); InetAddress deviceIp = client.getInetAddress(); String postURL = resp.getHTTPHeaderField( "SID" ); Subscription subscription = null; if ( sid != null && postURL != null ) { subscription = lookupSubscriber( sid, deviceIp ); if ( subscription == null ) { // not found maybe that the IP is not the same subscription = lookupSubscriber( sid ); } } if ( subscription != null ) { // respond ok out.write( "HTTP/1.1 200 OK\r\n".getBytes() ); } else { // unknown sid respond ko out.write( "HTTP/1.1 412 Precondition Failed\r\n".getBytes() ); } out.flush(); in.close(); out.close(); client.close(); if ( subscription != null ) { // let's parse it SAXParserFactory saxParFact = SAXParserFactory.newInstance(); saxParFact.setValidating( false ); saxParFact.setNamespaceAware( true ); SAXParser parser = saxParFact.newSAXParser(); ServiceEventMessageParser msgParser = new ServiceEventMessageParser(); StringReader stringReader = new StringReader( resp.getBody() ); InputSource src = new InputSource( stringReader ); parser.parse( src, msgParser ); Map changedStateVars = msgParser.getChangedStateVars(); for ( Iterator i = changedStateVars.keySet().iterator(); i.hasNext(); ) { String stateVarName = (String)i.next(); String stateVarNewVal = (String)changedStateVars.get( stateVarName ); subscription.handler.handleStateVariableEvent( stateVarName, stateVarNewVal ); } } } } } catch ( IOException ioEx ) { log.error( "IO Exception during client processing thread", ioEx ); } catch( Exception ex ) { log.error( "Unexpected error during client processing thread", ex ); } } } }