Source code for fauxmo.protocols

"""protocols.py

Holds asyncio protocols required classes for the Echo's UPnP / SSDP device
discovery.
"""

import asyncio
from email.utils import formatdate
import socket
import struct
import uuid

from fauxmo import logger
from fauxmo.utils import make_serial


[docs]class Fauxmo(asyncio.Protocol): """Mimics a WeMo switch on the network. Aysncio protocol intended for use with BaseEventLoop.create_server. """ def __init__(self, name, action_handler): """Initialize a Fauxmo device. Args: name (str): How you want to call the device, e.g. "bedroom light" action_handler (fauxmo.handler): Fauxmo action handler object """ self.name = name self.serial = make_serial(name) self.action_handler = action_handler
[docs] def connection_made(self, transport): peername = transport.get_extra_info('peername') logger.debug("Connection made with: {}".format(peername)) self.transport = transport
[docs] def data_received(self, data): """Decode data and determine if it is a setup or action request""" msg = data.decode() logger.debug("Received message:\n{}".format(msg)) if msg.startswith('GET /setup.xml HTTP/1.1'): logger.debug("setup.xml requested by Echo") self.handle_setup() elif msg.startswith('POST /upnp/control/basicevent1 HTTP/1.1'): self.handle_action(msg)
[docs] def handle_setup(self): """Create a response to the Echo's setup request""" date_str = formatdate(timeval=None, localtime=False, usegmt=True) setup_xml = '\r\n'.join([ '<?xml version="1.0"?>', '<root>', '<device>', '<deviceType>urn:Fauxmo:device:controllee:1</deviceType>', '<friendlyName>{}</friendlyName>'.format(self.name), '<manufacturer>Belkin International Inc.</manufacturer>', '<modelName>Emulated Socket</modelName>', '<modelNumber>3.1415</modelNumber>', '<UDN>uuid:Socket-1_0-{}</UDN>'.format(self.serial), '</device>', '</root>']) + 2 * '\r\n' # Made as a separate string because it requires `len(setup_xml)` setup_response = '\r\n'.join([ 'HTTP/1.1 200 OK', 'CONTENT-LENGTH: {}'.format(len(setup_xml)), 'CONTENT-TYPE: text/xml', 'DATE: {}'.format(date_str), 'LAST-MODIFIED: Sat, 01 Jan 2000 00:01:15 GMT', 'SERVER: Unspecified, UPnP/1.0, Unspecified', 'X-User-Agent: Fauxmo', 'CONNECTION: close']) + 2 * '\r\n' + setup_xml logger.debug("Fauxmo response to setup request:\n{}" .format(setup_response)) self.transport.write(setup_response.encode()) self.transport.close()
[docs] def handle_action(self, msg): """Execute `on` or `off` method of `action_handler` Args: msg (str): Body of the Echo's HTTP request to trigger an action """ success = False if '<BinaryState>0</BinaryState>' in msg: # `off()` method called success = self.action_handler.off() elif '<BinaryState>1</BinaryState>' in msg: # `on()` method called success = self.action_handler.on() else: logger.debug("Unrecognized request:\n{}".format(msg)) if success: date_str = formatdate(timeval=None, localtime=False, usegmt=True) response = '\r\n'.join([ 'HTTP/1.1 200 OK', 'CONTENT-LENGTH: 0', 'CONTENT-TYPE: text/xml charset="utf-8"', 'DATE: {}'.format(date_str), 'EXT:', 'SERVER: Unspecified, UPnP/1.0, Unspecified', 'X-User-Agent: Fauxmo', 'CONNECTION: close']) + 2 * '\r\n' logger.debug(response) self.transport.write(response.encode()) self.transport.close()
[docs]class SSDPServer(asyncio.DatagramProtocol): """Responds to the Echo's SSDP / UPnP requests""" def __init__(self, devices=None): """Initialize an SSDPServer instance. Kwargs: devices (list(dict)): List of devices to advertise when the Echo's SSDP search request is received. """ if devices is None: devices = [] self.devices = devices
[docs] def add_device(self, name, ip_address, port): device_dict = { 'name': name, 'ip_address': ip_address, 'port': port } self.devices.append(device_dict)
[docs] def connection_made(self, transport): self.transport = transport # Allow receiving multicast broadcasts sock = self.transport.get_extra_info('socket') group = socket.inet_aton('239.255.255.250') mreq = struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) if hasattr(socket, 'SO_REUSEPORT'): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
[docs] def datagram_received(self, data, addr): """Check incoming UDP data for requests for Wemo devices""" logger.debug("Received data below from {}:".format(addr)) logger.debug(data) if all(b in data for b in [b'"ssdp:discover"', b'urn:Belkin:device:**']): self.respond_to_search(addr)
[docs] def connection_lost(self, exc): if exc: logger.warning("SSDPServer closed with exception: {}".format(exc))