4. Project: Environment Monitor

Here’s our first “full” project for the Sense HAT: make an environmental monitor that can display the temperature, humidity, and pressure in a variety of forms. We’ve already seen a demo thermometer in Environment Sensing. First we’ll construct variants of this for the humidity and pressure sensors. Then we’ll combine all three into an application. Finally, we’ll add interactivity using the joystick to select the required functionality, recording the data to a database, and a trivial web interface.

4.1. Hygrometer

Firstly, let’s adapt our thermometer script for sensing humidity. Here’s the thermometer script again:

examples/thermometer.py
from __future__ import division  # for py2.x compatibility
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red
from time import sleep
import numpy as np

def thermometer(reading):
    t = max(0, min(50, reading.temperature)) / 50 * 64
    screen = array([
        Color('red') if i < int(t) else
        Color('red') * Red(t - int(t)) if i < t else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(str(int(round(reading.temperature))),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)

with SenseHAT() as hat:
    for reading in hat.environ:
        hat.screen.array = thermometer(reading)
        sleep(0.5)

We’ll use a very similar structure for our hygrometer. This time we don’t need to clamp the range (we’ll use the full 0% to 100%, but we’ll scale it to 0 <= n < 64 again). We’ll use a reasonably dark blue (“#000088” in HTML terms) for the chart, but everything else should look fairly familiar:

examples/hygrometer.py
from __future__ import division  # for py2.x compatibility
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Blue
from time import sleep
import numpy as np

def hygrometer(reading):
    h = reading.humidity / 100 * 64
    screen = array([
        Color('#008') if i < int(h) else
        Color('#008') * Blue(h - int(h)) if i < h else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
                                  str(int(round(reading.humidity))),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)

with SenseHAT() as hat:
    for reading in hat.environ:
        hat.screen.array = hygrometer(reading)
        sleep(0.5)

The one other subtle change is in the caption. We can’t fit “100” on our display; it’s too wide (this wasn’t a problem for the thermometer where we clamped the temperature range from 0°C to 50°C; if you guessed this was for simplicity, you were right!). Instead, whenever the humidity is >99% we display “^^” to indicate the maximum value.

Test this script out by running it and then breathing gently on the humidity sensor. You should see the humidity reading rise rapidly (possibly to “^^”) then slowly fall back down.

4.2. Barometer

Next we’ll tackle the pressure sensor. This will have a very familiar structure by now:

  1. Clamp the pressure readings to a sensible range (in this case we’ll use 950mbar to 1050mbar).
  2. Scale this to the range 0 <= n < 64.
  3. Draw a rudimentary chart (we’ll use green to distinguish it from our thermometer and hygrometer scripts).
  4. Draw the pressure as a number superimposed on the chart.

Oh dear, there’s a problem! All the valid pressure values are too large to fit on the display, so we can’t use an easy hack like displaying “^^” as we did in the hygrometer above.

It’d be nice if the pressure reading could scroll back and forth on the display, still superimposed on the chart. It turns out, using iterators again, this is actually quite easy to achieve. What we want is a sliding window over our rendered text, like so:

_images/sliding_window.svg

Hence our first requirement is an infinite iterator which produces the “bouncing” X offset for the sliding window:

def bounce(it):
    # bounce('ABC') --> A B C C B A A B C ...
    return cycle(chain(it, reversed(it)))

Well, that was simple!

The cycle() and chain() functions come from the standard library’s fantastic itertools module which I urge anyone using iterators to check out. The reversed() function is a standard built-in function in Python.

How do we combine the offsets produced by bounce with the readings from the sensor? We simply use the built-in zip() function:

examples/barometer.py
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Green
from time import sleep
from itertools import cycle, chain
import numpy as np

def bounce(it):
    # bounce('ABC') --> A B C C B A A B C ...
    return cycle(chain(it, reversed(it)))

def barometer(offset, reading):
    p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
    screen = array([
        Color('green') if i < int(p) else
        Color('green') * Green(p - int(p)) if i < p else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(str(int(round(reading.pressure))),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 8, 3)))
    screen[:text.shape[0], :] += text[:, offset:offset + 8]
    return screen.clip(0, 1)

with SenseHAT() as hat:
    for offset, reading in zip(bounce(range(8)), hat.environ):
        hat.screen.array = barometer(offset, reading)
        sleep(0.2)

Note

This example will only work in Python 3 because it evaluates zip() lazily. In Python 2, this will crash as zip attempts to construct a list for an infinite iterator (use izip from itertools in Python 2).

Exercise

Can you adjust the hygrometer script so that it scrolls “100” when that is the reading, but smaller values stay static on the display?

4.3. Combining Screens

We now have the three scripts that we want for our environmental monitor, but how do we combine them into a single application? Our first step will be a simple one: to make a function that will rotate between each of our transformations periodically, first showing the thermometer for a few seconds, then the hygrometer, then the barometer.

The easiest way to do this is to modify our thermometer and hygrometer transforms to take a (useless) offset parameter just like the barometer transform. Then (because our functions all now have a common prototype, and functions are first class objects in Python) we can construct a cycle() of transforms and just loop around them. The result looks like this:

examples/monitor_auto.py
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red, Green, Blue
from time import sleep
from itertools import cycle, chain
import numpy as np


def thermometer(offset, reading):
    t = max(0, min(50, reading.temperature)) / 50 * 64
    screen = array([
        Color('red') if i < int(t) else
        Color('red') * Red(t - int(t)) if i < t else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(int(round(reading.temperature)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)


def hygrometer(offset, reading):
    h = reading.humidity / 100 * 64
    screen = array([
        Color('#008') if i < int(h) else
        Color('#008') * Blue(h - int(h)) if i < h else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
                                  int(round(reading.humidity)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)


def barometer(offset, reading):
    p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
    screen = array([
        Color('green') if i < int(p) else
        Color('green') * Green(p - int(p)) if i < p else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(int(round(reading.pressure)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 8, 3)))
    screen[:text.shape[0], :] += text[:, offset:offset + 8]
    return screen.clip(0, 1)


def bounce(it):
    # bounce('ABC') --> A B C C B A A B C ...
    return cycle(chain(it, reversed(it)))


def switcher(readings):
    for transform in cycle((thermometer, hygrometer, barometer)):
        for offset, reading in zip(bounce(range(8)), readings):
            yield transform(offset, reading)
            sleep(0.2)


def main():
    with SenseHAT() as hat:
        for a in switcher(hat.environ):
            hat.screen.array = a


if __name__ == '__main__':
    main()

4.4. Interactivity!

Switching automatically between things is okay, but it would be nicer if we could control the switching with the joystick. For example, we could lay out our screens side-by-side with thermometer at the far left, then hygrometer, then pressure at the far right, and when the user presses left or right we could scroll between the displays.

To do this we just need to refine our switcher function so that it depends on both the readings (which it will pass to whatever the current transformation is), and events from the joystick.

def switcher(events, readings):
    screens = {
        (thermometer, 'right'): hygrometer,
        (hygrometer, 'left'): thermometer,
        (hygrometer, 'right'): barometer,
        (barometer, 'left'): hygrometer,
    }
    screen = thermometer
    for event, offset, reading in zip(events, bounce(range(8)), readings):
        yield screen(offset, reading)
        if event is not None and event.pressed:
            try:
                screen = screens[screen, event.direction]
            except KeyError:
                break
        sleep(0.2)

However, we have a problem: the joystick only yields events when something happens so if we use this, our display will only update when the joystick emits an event (because zip() will only yield a tuple of values when all iterators it covers have each yielded a value).

Thankfully, there’s a simple solution: the SenseStick.stream attribute. When this is set to True the joystick will immediately yield a value whenever one is requested. If no event has occurred it will simply yield None. So all our script needs to do is remember to set SenseStick.stream to True at the start and everything will work happily. Just to make the exit a bit prettier we’ll fade the screen to black too:

def main():
    with SenseHAT() as hat:
        hat.stick.stream = True
        for a in switcher(hat.stick, hat.environ):
            hat.screen.array = a
        hat.screen.fade_to(array(Color('black')))

4.5. Finishing Touches

The fade is a nice touch, but it would be nicer if the screens would “slide” between each other. And we’ve still got to add the database output too!

Thankfully this is all pretty easy to arrange. The main procedure is the ideal place to handle transitions like fading and sliding; it just needs to be told when to perform them. The switcher function can tell it when to do this by yielding two values: the array to copy to the display, and the transition animation to perform (if any). While we’re at it, we may as well move the fade to black to the end of the loop in switcher.

def switcher(events, readings):
    screens = {
        (thermometer, 'right'): hygrometer,
        (hygrometer, 'left'): thermometer,
        (hygrometer, 'right'): barometer,
        (barometer, 'left'): hygrometer,
    }
    screen = thermometer
    for event, offset, reading in zip(events, bounce(range(8)), readings):
        anim = 'draw'
        if event is not None and event.pressed:
            try:
                screen = screens[screen, event.direction]
                anim = event.direction
            except KeyError:
                yield array(Color('black')), 'fade'
                break
        yield screen(offset, reading), anim
        sleep(0.2)

Now we enhance the main function to perform various transitions:

def main():
    with SenseHAT() as hat:
        hat.stick.stream = True
        for a, anim in switcher(hat.stick, hat.environ):
            if anim == 'fade':
                hat.screen.fade_to(a, duration=0.5)
            elif anim == 'right':
                hat.screen.slide_to(a, direction='left', duration=0.5)
            elif anim == 'left':
                hat.screen.slide_to(a, direction='right', duration=0.5)
            else:
                hat.screen.array = a

Finally, we did promise that we’re going to store the data in a database. Ideally, we want a round-robin database for which we can use the excellent rrdtool project (if you wish to understand the rrdtool calls below, I’d strongly recommend reading its documentation). This provides all sorts of facilities beyond just recording the data, including averaging it over convenient time periods and producing good-looking charts of the data.

Note

Unfortunately, the Python 3 bindings for rrdtool don’t appear to be packaged at the moment so we’ll need to install them manually. On Raspbian you can do this like so:

$ sudo apt install rrdtool librrd-dev python3-pip
$ sudo pip3 install rrdtool

On other platforms the pip command will likely be similar, but the pre-requisites installed with apt may well differ.

We’ll add a little code to construct the round-robin database if it doesn’t already exist, then add a tiny amount of code to record readings into the database. The final result (with the lines we’ve added highlighted) is as follows:

examples/monitor_final.py
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red, Green, Blue
from time import time, sleep
from itertools import cycle, chain
import numpy as np
import io
import rrdtool


def thermometer(offset, reading):
    t = max(0, min(50, reading.temperature)) / 50 * 64
    screen = array([
        Color('red') if i < int(t) else
        Color('red') * Red(t - int(t)) if i < t else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(int(round(reading.temperature)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)


def hygrometer(offset, reading):
    h = reading.humidity / 100 * 64
    screen = array([
        Color('#008') if i < int(h) else
        Color('#008') * Blue(h - int(h)) if i < h else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
                                  int(round(reading.humidity)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 0, 3)))
    screen[:text.shape[0], :text.shape[1]] += text
    return screen.clip(0, 1)


def barometer(offset, reading):
    p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
    screen = array([
        Color('green') if i < int(p) else
        Color('green') * Green(p - int(p)) if i < p else
        Color('black')
        for i in range(64)
    ])
    screen = np.flipud(screen)
    text = image_to_rgb(draw_text(int(round(reading.pressure)),
                                  'small.pil', foreground=Color('gray'),
                                  padding=(0, 0, 8, 3)))
    screen[:text.shape[0], :] += text[:, offset:offset + 8]
    return screen.clip(0, 1)


def bounce(it):
    # bounce('ABC') --> A B C C B A A B C ...
    return cycle(chain(it, reversed(it)))


def create_database(database):
    try:
        rrdtool.create(
            database,            # Filename of the database
            '--no-overwrite',    # Don't overwrite the file if it exists
            '--step', '5s',      # Data will be fed at least every 5 seconds
            'DS:temperature:GAUGE:1m:-70:70',  # Primary store for temperatures
            'DS:humidity:GAUGE:1m:0:100',      # Primary store for humidities
            'DS:pressure:GAUGE:1m:900:1100',   # Primary store for pressures
            'RRA:AVERAGE:0.5:5s:1d',  # Keep 1 day's worth of full-res data
            'RRA:AVERAGE:0.5:5m:1M',  # Keep 1 month of 5-minute-res data
            'RRA:AVERAGE:0.5:1h:1y',  # Keep 1 year of hourly data
            'RRA:MIN:0.5:1h:1y',      # ... including minimums
            'RRA:MAX:0.5:1h:1y',      # ... and maximums
            'RRA:AVERAGE:0.5:1d:10y', # Keep 10 years of daily data
            'RRA:MIN:0.5:1d:10y',     # ... including minimums
            'RRA:MAX:0.5:1d:10y',     # ... and maximums
        )
    except rrdtool.OperationalError:
        pass # file exists; ignore the error


def update_database(database, reading):
    data = 'N:{r.temperature}:{r.humidity}:{r.pressure}'.format(r=reading)
    rrdtool.update(database, data)


def switcher(events, readings, database='environ.rrd'):
    create_database(database)
    screens = {
        (thermometer, 'right'): hygrometer,
        (hygrometer, 'left'): thermometer,
        (hygrometer, 'right'): barometer,
        (barometer, 'left'): hygrometer,
    }
    screen = thermometer
    last_update = None
    for event, offset, reading in zip(events, bounce(range(8)), readings):
        anim = 'draw'
        if event is not None and event.pressed:
            try:
                screen = screens[screen, event.direction]
                anim = event.direction
            except KeyError:
                yield array(Color('black')), 'fade'
                break
        now = time()
        if last_update is None or now - last_update > 5:
            # Only update the database every 5 seconds
            last_update = now
            update_database(database, reading)
        yield screen(offset, reading), anim
        sleep(0.2)


def main():
    with SenseHAT() as hat:
        hat.stick.stream = True
        for a, anim in switcher(hat.stick, hat.environ):
            if anim == 'fade':
                hat.screen.fade_to(a, duration=0.5)
            elif anim == 'right':
                hat.screen.slide_to(a, direction='left', duration=0.5)
            elif anim == 'left':
                hat.screen.slide_to(a, direction='right', duration=0.5)
            else:
                hat.screen.array = a


if __name__ == '__main__':
    main()

Exercise

At the moment, it’s too easy to accidentally exit the script. Can you make the application rotate around the screens (i.e. moving right from the barometer screen takes the user back to the thermometer screen, and vice-versa) and pressing the joystick is required to exit the application?

Finally, let’s whip up a little web-server that we can run alongside the Sense HAT script to allow remote clients to query our environmental data and see some pretty graphs of the history:

examples/monitor_server.py
import rrdtool
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from threading import Lock
from pathlib import PurePosixPath


class SensorData():
    def __init__(self, db):
        self._db = db
        self._data = rrdtool.lastupdate(db)
        self._images = {}

    @property
    def date(self):
        return self._data['date']

    def __format__(self, format_spec):
        element, units = format_spec.split(':')
        template = """
<div class="sensor">
    <h2>{title}</h2>
    <span class="reading">{current:.1f}{units}</span>
    <img class="recent" src="{element}_recent.svg" />
    <img class="history" src="{element}_history.svg" />
</div>
"""
        return template.format(
            element=element,
            title=element.title(),
            units=units,
            current=self._data['ds'][element])

    def image(self, path):
        try:
            image = self._images[path]
        except KeyError:
            # generate it
            p = PurePosixPath(path)
            try:
                element, duration = p.stem.split('_', 1)
            except ValueError:
                raise KeyError(path)
            start = {
                'recent':  '1d',
                'history': '1M',
            }[duration]
            color = {
                'temperature': '#FF0000',
                'humidity':    '#0000FF',
                'pressure':    '#00FF00',
            }[element]
            self._images[path] = image = rrdtool.graphv(
                '-',
                '--imgformat', 'SVG',
                '--border', '0',
                '--color', 'BACK#00000000', # transparent
                '--start', 'now-' + start,
                '--end', 'now',
                'DEF:v={db}:{element}:AVERAGE'.format(db=self._db, element=element),
                'LINE2:v{color}'.format(color=color)
            )['image']
        return image


class RequestHandler(BaseHTTPRequestHandler):
    database = 'environ.rrd'
    data = None
    index_template = """
<html>
    <head>
        <title>Sense HAT Environment Sensors</title>
        <link href="https://fonts.googleapis.com/css?family=Raleway" rel="stylesheet">
        <style>{style_sheet}</style>
    </head>
    <body>
        <h1>Sense HAT Environment Sensors</h1>
        <div id="timestamp">{data.date:%A, %d %b %Y %H:%M:%S}</div>
        {data:temperature:°C}
        {data:humidity:%RH}
        {data:pressure:mbar}
        <script>
        setTimeout(() => location.reload(true), 10000);
        </script>
    </body>
</html>
"""
    style_sheet = """
body {
    font-family: "Raleway", sans-serif;
    max-width: 700px;
    margin: 1em auto;
}

h1 { text-align: center; }

div {
    padding: 8px;
    margin: 1em 0;
    border-radius: 8px;
}

div#timestamp {
    font-size: 16pt;
    background-color: #bbf;
    text-align: center;
}

div.sensor { background-color: #ddd; }

div.sensor h2 {
    font-size: 20pt;
    margin-top: 0;
    padding-top: 0;
    float: left;
}

span.reading {
    font-size: 20pt;
    float: right;
    background-color: #ccc;
    border-radius: 8px;
    box-shadow: inset 0 0 4px black;
    padding: 4px 8px;
}
"""

    def get_sensor_data(self):
        # Keep a copy of the latest SensorData around for efficiency
        old_data = RequestHandler.data
        new_data = SensorData(RequestHandler.database)
        if old_data is None or new_data.date > old_data.date:
            RequestHandler.data = new_data
        return RequestHandler.data

    def do_HEAD(self):
        self.do_GET()

    def do_GET(self):
        if self.path == '/':
            self.send_response(301)
            self.send_header('Location', '/index.html')
            self.end_headers()
        elif self.path == '/index.html':
            data = self.get_sensor_data()
            content = RequestHandler.index_template.format(
                style_sheet=RequestHandler.style_sheet,
                data=data).encode('utf-8')
            self.send_response(200)
            self.send_header('Content-Type', 'text/html; charset=utf-8')
            self.send_header('Content-Length', len(content))
            self.send_header('Last-Modified', self.date_time_string(
                data.date.timestamp()))
            self.end_headers()
            self.wfile.write(content)
        elif self.path.endswith('.svg'):
            data = self.get_sensor_data()
            try:
                content = data.image(self.path)
            except KeyError:
                self.send_error(404)
            else:
                self.send_response(200)
                self.send_header('Content-Type', 'image/svg+xml')
                self.send_header('Content-Length', len(content))
                self.end_headers()
                self.wfile.write(content)
        else:
            self.send_error(404)


def main():
    httpd = HTTPServer(('', 8000), RequestHandler)
    httpd.serve_forever()


if __name__ == '__main__':
    main()

Run this alongside the monitor script, make sure your Pi is accessible on your local network and then visit http://your-pis-address-here:8000/ in a web-browser.

Note

We could have added this to the monitor script, but frankly there’s no point as rrdtool includes all the locking we need to have something reading the database while something else writes to it. This also ensures that a bug in one script doesn’t affect the operation of the other, and means web requests are far less likely to affect the operation of the Sense HAT interface.

4.6. Auto-start

This is the sort of application it would be nice to start automatically upon boot up. Thankfully, this is easy to arrange with a few systemd files. Create the following under /etc/systemd/system/monitor_app.service:

examples/monitor_app.service
[Unit]
Description=An environment monitoring application
After=local-fs.target

[Service]
ExecStart=/usr/bin/python3 /home/pi/monitor_final.py
WorkingDirectory=/home/pi
User=pi

[Install]
WantedBy=multi-user.target

Note

You’ll need to modify the path for ExecStart to point to the location of your monitor_final.py script. You may want to modify WorkingDirectory too if you want the database to be stored in another location.

Then for the web-service (if you want it), create the following under /etc/systemd/system/monitor_web.service:

examples/monitor_web.service
[Unit]
Description=Web server for the environment monitoring application
After=local-fs.target network.target

[Service]
ExecStart=/usr/bin/python3 /home/pi/monitor_server.py
WorkingDirectory=/home/pi
User=pi

[Install]
WantedBy=multi-user.target

Note

Remember to modify ExecStart (and optionally WorkingDirectory) as above.

Finally, inform systemd of the changes and tell it we want to start these new services on boot-up as follows. For example, the following commands might be used to achieve all of this:

$ cd /home/pi
$ nano monitor_app.service
$ nano monitor_web.service
$ sudo cp monitor_*.service /etc/systemd/system/
$ sudo systemctl daemon-reload
$ sudo systemctl enable monitor_app
$ sudo systemctl enable monitor_web

To start the services immediately:

$ sudo systemctl start monitor_app
$ sudo systemctl start monitor_web

To stop the services immediately:

$ sudo systemctl stop monitor_app
$ sudo systemctl stop monitor_web

If you want to disable these from starting at boot time you can simply run the following commands:

$ sudo systemctl disable monitor_app
$ sudo systemctl disable monitor_web

Naturally, you could disable the web service but leave the main application running too.