Source code for pisense.stick

# vim: set et sw=4 sts=4 fileencoding=utf-8:
#
# Alternative API for the Sense HAT
# Copyright (c) 2016-2018 Dave Jones <dave@waveform.org.uk>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the copyright holder nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Defines the :class:`SenseStick` class representing the Sense HAT's joystick.
"""

from __future__ import (
    unicode_literals,
    absolute_import,
    print_function,
    division,
)

import io
import os
import glob
import errno
import struct
import select
import warnings
import termios
from datetime import datetime
from collections import namedtuple
from threading import Thread, Event, Lock
try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty

from .exc import SenseStickBufferFull, SenseStickCallbackRead

# native_str represents the "native" str type (bytes in Py 2, unicode in Py 3)
# of the interpreter; str is then redefined to always represent unicode
native_str = str  # pylint: disable=invalid-name
str = type('')  # pylint: disable=redefined-builtin,invalid-name


[docs]class StickEvent(namedtuple('StickEvent', ('timestamp', 'direction', 'pressed', 'held'))): """ Represents a joystick event as a :func:`~collections.namedtuple`. The fields of the event are defined below: .. attribute:: timestamp A :class:`~datetime.datetime` object specifying when the event took place. This timestamp is derived from the kernel event so it should be accurate even when callbacks have taken time reacting to events. The timestamp is a naive :class:`~datetime.datetime` object in local time. .. attribute:: direction A string indicating which direction the event pertains to. Can be one of "up", "down", "leftʺ, "right", or "enter" (the last event refers to the joystick being pressed inward). .. attribute:: pressed A bool which is ``True`` when the event indicates that the joystick is being pressed *or held* in the specified direction. When this is ``False``, the event indicates that the joystick has been released from the specified direction. .. attribute:: held A bool which is ``True`` when the event indicates that the joystick direction is currently held down (when :attr:`pressed` is also ``True``) or that the direction was *previously* held down (when :attr:`pressed` is ``False`` buut :attr:`held` is still ``True``). """ __slots__ = ()
[docs]class SenseStick(object): """ The :class:`SenseStick` class represents the joystick on the Sense HAT. Users can either instantiate this class themselves, or can access an instance from :attr:`SenseHAT.stick`. The :meth:`read` method can be called to obtain :class:`StickEvent` instances, or the instance can be treated as an iterator in which case events will be yielded as they come in:: hat = SenseHAT() for event in hat.stick: if event.pressed and not event.held: print('%s pressed!' % event.direction) Alternatively, handler functions can be assigned to the attributes :attr:`when_up`, :attr:`when_down`, :attr:`when_left`, :attr:`when_right`, :attr:`when_enter`. The assigned functions will be called when any matching event occurs. Finally, the attributes :attr:`up`, :attr:`down`, :attr:`left`, :attr:`right`, and attr:`enter` can be polled to determine the current state of the joystick. The :attr:`rotation` attribute can be modified to alter the orientation of events, and the aforementioned attributes. The *max_events* parameter controls the size of the internal queue used to buffer joystick events. This defaults to 100 which should be more than sufficient to ensure events are not lost. The *flush_input* parameter, which defaults to ``True`` controls whether, when the instance is closed, it attempts to flush the stdin of the owning terminal. This is useful as the joystick also acts as a keyboard. On the command line, this can mean that joystick movements (buffered during a script's execution) can inadvertently execute historical commands (e.g. Up a few times followed by Enter). Finally, if the *emulate* parameter is ``True``, the instance will connect to the joystick in the `desktop Sense HAT emulator`_ instead of the "real" Sense HAT joystick. .. _desktop Sense HAT emulator: https://sense-emu.readthedocs.io/ """ # pylint: disable=too-many-instance-attributes __slots__ = ( '_flush', '_callbacks_lock', '_callbacks_close', '_callbacks', '_callbacks_thread', '_closing', '_stream', '_buffer', '_read_thread', '_pressed', '_held', '_rotation', '_rot_map', ) SENSE_HAT_EVDEV_NAME = 'Raspberry Pi Sense HAT Joystick' EVENT_FORMAT = native_str('llHHI') EVENT_SIZE = struct.calcsize(EVENT_FORMAT) EV_KEY = 0x01 STATE_RELEASE = 0 STATE_PRESS = 1 STATE_HOLD = 2 KEY_UP = 103 KEY_LEFT = 105 KEY_RIGHT = 106 KEY_DOWN = 108 KEY_ENTER = 28 def __init__(self, max_events=100, flush_input=True, emulate=False): self._flush = flush_input self._callbacks_lock = Lock() self._callbacks_close = Event() self._callbacks = {} self._callbacks_thread = None self._closing = Event() self._stream = False self._buffer = Queue(maxsize=max_events) if emulate: from sense_emu.stick import init_stick_client stick_file = init_stick_client() else: stick_file = io.open(self._stick_device(), 'rb', buffering=0) self._read_thread = Thread(target=self._read_stick, args=(stick_file,)) self._read_thread.daemon = True self._read_thread.start() # This is just a guess; we can't know the actual joystick state at # initialization. However, if it's incorrect, future events should # correct this self._pressed = set() self._held = set() self._rotation = 0 self._rot_map = { SenseStick.KEY_UP: SenseStick.KEY_RIGHT, SenseStick.KEY_LEFT: SenseStick.KEY_UP, SenseStick.KEY_DOWN: SenseStick.KEY_LEFT, SenseStick.KEY_RIGHT: SenseStick.KEY_DOWN, SenseStick.KEY_ENTER: SenseStick.KEY_ENTER, }
[docs] def close(self): """ Call the :meth:`close` method to close the joystick interface and free up any background resources. The method is idempotent (you can call it multiple times without error) and after it is called, any operations on the joystick may return an error (but are not guaranteed to do so). """ if self._read_thread is not None: self._closing.set() self._read_thread.join() if self._callbacks_thread: self._callbacks_thread.join() self._read_thread = None self._callbacks_thread = None self._buffer = None if self._flush: self._flush = False try: termios.tcflush(0, termios.TCIFLUSH) except termios.error: # If stdin isn't a tty (or isn't available), ignore the error pass
def __iter__(self): while True: yield self.read(0 if self._stream else None) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() @staticmethod def _stick_device(): for evdev in glob.glob('/sys/class/input/event*'): try: with io.open(os.path.join(evdev, 'device', 'name'), 'r') as f: if f.read().strip() == SenseStick.SENSE_HAT_EVDEV_NAME: return os.path.join('/dev', 'input', os.path.basename(evdev)) except IOError as exc: if exc.errno != errno.ENOENT: raise raise RuntimeError('unable to locate SenseHAT joystick device') def _read_stick(self, stick_file): try: while not self._closing.wait(0): if select.select([stick_file], [], [], 0.1)[0]: event = stick_file.read(SenseStick.EVENT_SIZE) if event == b'': # This is mostly to ease testing, but also deals with # some edge cases break ( tv_sec, tv_usec, evt_type, code, value, ) = struct.unpack(SenseStick.EVENT_FORMAT, event) if evt_type == SenseStick.EV_KEY: if self._buffer.full(): warnings.warn(SenseStickBufferFull( "The internal SenseStick buffer is full; " "try reading some events!")) self._buffer.get() r = self._rotation while r: code = self._rot_map[code] r -= 90 evt = StickEvent( timestamp=datetime.fromtimestamp( tv_sec + (tv_usec / 1000000) ), direction={ SenseStick.KEY_UP: 'up', SenseStick.KEY_DOWN: 'down', SenseStick.KEY_LEFT: 'left', SenseStick.KEY_RIGHT: 'right', SenseStick.KEY_ENTER: 'enter', }[code], pressed=(value != SenseStick.STATE_RELEASE), held=(value == SenseStick.STATE_HOLD or ( value == SenseStick.STATE_RELEASE and code in self._held)) ) if not evt.pressed: self._pressed -= {code} self._held -= {code} elif evt.held: self._pressed |= {code} # to correct state self._held |= {code} else: # pressed self._pressed |= {code} self._held -= {code} # to correct state # Only push event onto the queue once the internal # state is updated; this ensures the various read-only # properties will be accurate for event handlers that # subsequently fire (although if they take too long the # state may change again before the next handler fires) self._buffer.put(evt) finally: stick_file.close() def _run_callbacks(self): while not self._callbacks_close.wait(0) and not self._closing.wait(0): try: event = self._buffer.get(timeout=0.1) except Empty: pass else: with self._callbacks_lock: try: callback = self._callbacks[event.direction] except KeyError: callback = None if callback is not None: callback(event) def _start_stop_callbacks(self): with self._callbacks_lock: if self._callbacks and not self._callbacks_thread: self._callbacks_close.clear() self._callbacks_thread = Thread(target=self._run_callbacks) self._callbacks_thread.daemon = True self._callbacks_thread.start() elif not self._callbacks and self._callbacks_thread: self._callbacks_close.set() self._callbacks_thread.join() self._callbacks_thread = None @property def rotation(self): """ Specifies the rotation (really, the orientation) of the joystick as a multiple of 90 degrees. When rotation is 0 (the default), "up" is toward the GPIO pins: .. image:: images/rotation_0.* When rotation is 90, "up" is towards the LEDs, and so on: .. image:: images/rotation_90.* The other two rotations are trivial to derive from this. .. note:: This property is updated by the unifying :attr:`SenseHAT.rotation` attribute. """ return self._rotation @rotation.setter def rotation(self, value): # TODO If rotation is modified while _pressed and _held aren't empty # then we potentially have bad state (unless it's just ENTER); # technically we should anti-rotate their current values here... if value % 90: raise ValueError('rotation must be a multiple of 90') self._rotation = value % 360
[docs] def read(self, timeout=None): """ Wait up to *timeout* seconds for another joystick event. If one occurs, return it as a :class:`StickEvent`, otherwise return ``None``. .. note:: Attempting to call this method when callbacks are assigned to attributes like :attr:`when_left` will trigger a :exc:`SenseStickCallbackRead` warning. This is because using the callback mechanism causes a background thread to continually read joystick events (removing them from the queue that :meth:`read` accesses). Mixing these programming styles can result in missing events. """ if self._callbacks_thread is not None: warnings.warn(SenseStickCallbackRead( 'read called while when_* callbacks are assigned')) try: return self._buffer.get(timeout=timeout) except Empty: return None
@property def stream(self): """ When ``True``, treating the joystick as an iterator will always yield immediately (yielding ``None`` if no event has occurred). When ``False`` (the default), the iterator will only yield when an event has occurred. .. note:: This property can be set while an iterator is active, but if the current value is ``False``, the iterator will wait indefinitely for the next event *before* it will start returning ``None``. It is better to set this property *prior* to obtaining the iterator. """ return self._stream @stream.setter def stream(self, value): self._stream = bool(value) @property def up(self): """ Returns ``True`` if the joystick is currently pressed upward. """ # pylint: disable=invalid-name return SenseStick.KEY_UP in self._pressed @property def up_held(self): """ Returns ``True`` if the joystick is currently held upward. """ return SenseStick.KEY_UP in self._held @property def when_up(self): """ The function to call when the joystick is moved upward. """ with self._callbacks_lock: return self._callbacks.get('up') @when_up.setter def when_up(self, value): with self._callbacks_lock: if value: self._callbacks['up'] = value else: self._callbacks.pop('up', None) self._start_stop_callbacks() @property def down(self): """ Returns ``True`` if the joystick is currently pressed downward. """ return SenseStick.KEY_DOWN in self._pressed @property def down_held(self): """ Returns ``True`` if the joystick is currently held downward. """ return SenseStick.KEY_DOWN in self._held @property def when_down(self): """ The function to call when the joystick is moved downward. """ with self._callbacks_lock: return self._callbacks.get('down') @when_down.setter def when_down(self, value): with self._callbacks_lock: if value: self._callbacks['down'] = value else: self._callbacks.pop('down', None) self._start_stop_callbacks() @property def left(self): """ Returns ``True`` if the joystick is currently pressed leftward. """ return SenseStick.KEY_LEFT in self._pressed @property def left_held(self): """ Returns ``True`` if the joystick is currently held leftward. """ return SenseStick.KEY_LEFT in self._held @property def when_left(self): """ The function to call when the joystick is moved leftward. """ with self._callbacks_lock: return self._callbacks.get('left') @when_left.setter def when_left(self, value): with self._callbacks_lock: if value: self._callbacks['left'] = value else: self._callbacks.pop('left', None) self._start_stop_callbacks() @property def right(self): """ Returns ``True`` if the joystick is currently pressed rightward. """ return SenseStick.KEY_RIGHT in self._pressed @property def right_held(self): """ Returns ``True`` if the joystick is currently held rightward. """ return SenseStick.KEY_RIGHT in self._held @property def when_right(self): """ The function to call when the joystick is moved rightward. """ with self._callbacks_lock: return self._callbacks.get('right') @when_right.setter def when_right(self, value): with self._callbacks_lock: if value: self._callbacks['right'] = value else: self._callbacks.pop('right', None) self._start_stop_callbacks() @property def enter(self): """ Returns ``True`` if the joystick is currently pressed inward. """ return SenseStick.KEY_ENTER in self._pressed @property def enter_held(self): """ Returns ``True`` if the joystick is currently held inward. """ return SenseStick.KEY_ENTER in self._held @property def when_enter(self): """ The function to call when the joystick is pressed in or released. """ with self._callbacks_lock: return self._callbacks.get('enter') @when_enter.setter def when_enter(self, value): with self._callbacks_lock: if value: self._callbacks['enter'] = value else: self._callbacks.pop('enter', None) self._start_stop_callbacks()