4. Project: Environment Monitor¶
Here’s our first “full” project for the Sense HAT: make an environmental monitor that can display the temperature, humidity, and pressure in a variety of forms. We’ve already seen a demo thermometer in Environment Sensing. First we’ll construct variants of this for the humidity and pressure sensors. Then we’ll combine all three into an application. Finally, we’ll add interactivity using the joystick to select the required functionality, recording the data to a database, and a trivial web interface.
4.1. Hygrometer¶
Firstly, let’s adapt our thermometer script for sensing humidity. Here’s the thermometer script again:
from __future__ import division # for py2.x compatibility
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red
from time import sleep
import numpy as np
def thermometer(reading):
t = max(0, min(50, reading.temperature)) / 50 * 64
screen = array([
Color('red') if i < int(t) else
Color('red') * Red(t - int(t)) if i < t else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(str(int(round(reading.temperature))),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
with SenseHAT() as hat:
for reading in hat.environ:
hat.screen.array = thermometer(reading)
sleep(0.5)
We’ll use a very similar structure for our hygrometer. This time we don’t need to clamp the range (we’ll use the full 0% to 100%, but we’ll scale it to 0 <= n < 64 again). We’ll use a reasonably dark blue (“#000088” in HTML terms) for the chart, but everything else should look fairly familiar:
from __future__ import division # for py2.x compatibility
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Blue
from time import sleep
import numpy as np
def hygrometer(reading):
h = reading.humidity / 100 * 64
screen = array([
Color('#008') if i < int(h) else
Color('#008') * Blue(h - int(h)) if i < h else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
str(int(round(reading.humidity))),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
with SenseHAT() as hat:
for reading in hat.environ:
hat.screen.array = hygrometer(reading)
sleep(0.5)
The one other subtle change is in the caption. We can’t fit “100” on our display; it’s too wide (this wasn’t a problem for the thermometer where we clamped the temperature range from 0°C to 50°C; if you guessed this was for simplicity, you were right!). Instead, whenever the humidity is >99% we display “^^” to indicate the maximum value.
Test this script out by running it and then breathing gently on the humidity sensor. You should see the humidity reading rise rapidly (possibly to “^^”) then slowly fall back down.
4.2. Barometer¶
Next we’ll tackle the pressure sensor. This will have a very familiar structure by now:
- Clamp the pressure readings to a sensible range (in this case we’ll use 950mbar to 1050mbar).
- Scale this to the range 0 <= n < 64.
- Draw a rudimentary chart (we’ll use green to distinguish it from our thermometer and hygrometer scripts).
- Draw the pressure as a number superimposed on the chart.
Oh dear, there’s a problem! All the valid pressure values are too large to fit on the display, so we can’t use an easy hack like displaying “^^” as we did in the hygrometer above.
It’d be nice if the pressure reading could scroll back and forth on the display, still superimposed on the chart. It turns out, using iterators again, this is actually quite easy to achieve. What we want is a sliding window over our rendered text, like so:
Hence our first requirement is an infinite iterator which produces the “bouncing” X offset for the sliding window:
def bounce(it):
# bounce('ABC') --> A B C C B A A B C ...
return cycle(chain(it, reversed(it)))
Well, that was simple!
The cycle()
and chain()
functions come from
the standard library’s fantastic itertools
module which I urge anyone
using iterators to check out. The reversed()
function is a standard
built-in function in Python.
How do we combine the offsets produced by bounce
with the readings from the
sensor? We simply use the built-in zip()
function:
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Green
from time import sleep
from itertools import cycle, chain
import numpy as np
def bounce(it):
# bounce('ABC') --> A B C C B A A B C ...
return cycle(chain(it, reversed(it)))
def barometer(offset, reading):
p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
screen = array([
Color('green') if i < int(p) else
Color('green') * Green(p - int(p)) if i < p else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(str(int(round(reading.pressure))),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 8, 3)))
screen[:text.shape[0], :] += text[:, offset:offset + 8]
return screen.clip(0, 1)
with SenseHAT() as hat:
for offset, reading in zip(bounce(range(8)), hat.environ):
hat.screen.array = barometer(offset, reading)
sleep(0.2)
Note
This example will only work in Python 3 because it evaluates zip()
lazily. In Python 2, this will crash as zip attempts to construct a list
for an infinite iterator (use izip
from itertools
in Python 2).
Exercise
Can you adjust the hygrometer script so that it scrolls “100” when that is the reading, but smaller values stay static on the display?
4.3. Combining Screens¶
We now have the three scripts that we want for our environmental monitor, but how do we combine them into a single application? Our first step will be a simple one: to make a function that will rotate between each of our transformations periodically, first showing the thermometer for a few seconds, then the hygrometer, then the barometer.
The easiest way to do this is to modify our thermometer and hygrometer
transforms to take a (useless) offset parameter just like the barometer
transform. Then (because our functions all now have a common prototype, and
functions are first class objects in Python) we can construct a
cycle()
of transforms and just loop around them. The result
looks like this:
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red, Green, Blue
from time import sleep
from itertools import cycle, chain
import numpy as np
def thermometer(offset, reading):
t = max(0, min(50, reading.temperature)) / 50 * 64
screen = array([
Color('red') if i < int(t) else
Color('red') * Red(t - int(t)) if i < t else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(int(round(reading.temperature)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
def hygrometer(offset, reading):
h = reading.humidity / 100 * 64
screen = array([
Color('#008') if i < int(h) else
Color('#008') * Blue(h - int(h)) if i < h else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
int(round(reading.humidity)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
def barometer(offset, reading):
p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
screen = array([
Color('green') if i < int(p) else
Color('green') * Green(p - int(p)) if i < p else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(int(round(reading.pressure)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 8, 3)))
screen[:text.shape[0], :] += text[:, offset:offset + 8]
return screen.clip(0, 1)
def bounce(it):
# bounce('ABC') --> A B C C B A A B C ...
return cycle(chain(it, reversed(it)))
def switcher(readings):
for transform in cycle((thermometer, hygrometer, barometer)):
for offset, reading in zip(bounce(range(8)), readings):
yield transform(offset, reading)
sleep(0.2)
def main():
with SenseHAT() as hat:
for a in switcher(hat.environ):
hat.screen.array = a
if __name__ == '__main__':
main()
4.4. Interactivity!¶
Switching automatically between things is okay, but it would be nicer if we could control the switching with the joystick. For example, we could lay out our screens side-by-side with thermometer at the far left, then hygrometer, then pressure at the far right, and when the user presses left or right we could scroll between the displays.
To do this we just need to refine our switcher
function so that it depends
on both the readings (which it will pass to whatever the current transformation
is), and events from the joystick.
def switcher(events, readings):
screens = {
(thermometer, 'right'): hygrometer,
(hygrometer, 'left'): thermometer,
(hygrometer, 'right'): barometer,
(barometer, 'left'): hygrometer,
}
screen = thermometer
for event, offset, reading in zip(events, bounce(range(8)), readings):
yield screen(offset, reading)
if event is not None and event.pressed:
try:
screen = screens[screen, event.direction]
except KeyError:
break
sleep(0.2)
However, we have a problem: the joystick only yields events when something
happens so if we use this, our display will only update when the joystick emits
an event (because zip()
will only yield a tuple of values when all
iterators it covers have each yielded a value).
Thankfully, there’s a simple solution: the SenseStick.stream
attribute.
When this is set to True
the joystick will immediately yield a value
whenever one is requested. If no event has occurred it will simply yield
None
. So all our script needs to do is remember to set
SenseStick.stream
to True
at the start and everything will work
happily. Just to make the exit a bit prettier we’ll fade the screen to black
too:
def main():
with SenseHAT() as hat:
hat.stick.stream = True
for a in switcher(hat.stick, hat.environ):
hat.screen.array = a
hat.screen.fade_to(array(Color('black')))
4.5. Finishing Touches¶
The fade is a nice touch, but it would be nicer if the screens would “slide” between each other. And we’ve still got to add the database output too!
Thankfully this is all pretty easy to arrange. The main
procedure is the
ideal place to handle transitions like fading and sliding; it just needs to be
told when to perform them. The switcher
function can tell it when to do
this by yielding two values: the array to copy to the display, and the
transition animation to perform (if any). While we’re at it, we may as well
move the fade to black to the end of the loop in switcher
.
def switcher(events, readings):
screens = {
(thermometer, 'right'): hygrometer,
(hygrometer, 'left'): thermometer,
(hygrometer, 'right'): barometer,
(barometer, 'left'): hygrometer,
}
screen = thermometer
for event, offset, reading in zip(events, bounce(range(8)), readings):
anim = 'draw'
if event is not None and event.pressed:
try:
screen = screens[screen, event.direction]
anim = event.direction
except KeyError:
yield array(Color('black')), 'fade'
break
yield screen(offset, reading), anim
sleep(0.2)
Now we enhance the main
function to perform various transitions:
def main():
with SenseHAT() as hat:
hat.stick.stream = True
for a, anim in switcher(hat.stick, hat.environ):
if anim == 'fade':
hat.screen.fade_to(a, duration=0.5)
elif anim == 'right':
hat.screen.slide_to(a, direction='left', duration=0.5)
elif anim == 'left':
hat.screen.slide_to(a, direction='right', duration=0.5)
else:
hat.screen.array = a
Finally, we did promise that we’re going to store the data in a database. Ideally, we want a round-robin database for which we can use the excellent rrdtool project (if you wish to understand the rrdtool calls below, I’d strongly recommend reading its documentation). This provides all sorts of facilities beyond just recording the data, including averaging it over convenient time periods and producing good-looking charts of the data.
Note
Unfortunately, the Python 3 bindings for rrdtool don’t appear to be packaged at the moment so we’ll need to install them manually. On Raspbian you can do this like so:
$ sudo apt install rrdtool librrd-dev python3-pip
$ sudo pip3 install rrdtool
On other platforms the pip
command will likely be similar, but the
pre-requisites installed with apt
may well differ.
We’ll add a little code to construct the round-robin database if it doesn’t already exist, then add a tiny amount of code to record readings into the database. The final result (with the lines we’ve added highlighted) is as follows:
# NB: this script is not compatible with py2.x
from pisense import SenseHAT, array, draw_text, image_to_rgb
from colorzero import Color, Red, Green, Blue
from time import time, sleep
from itertools import cycle, chain
import numpy as np
import io
import rrdtool
def thermometer(offset, reading):
t = max(0, min(50, reading.temperature)) / 50 * 64
screen = array([
Color('red') if i < int(t) else
Color('red') * Red(t - int(t)) if i < t else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(int(round(reading.temperature)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
def hygrometer(offset, reading):
h = reading.humidity / 100 * 64
screen = array([
Color('#008') if i < int(h) else
Color('#008') * Blue(h - int(h)) if i < h else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text('^^' if reading.humidity > 99 else
int(round(reading.humidity)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 0, 3)))
screen[:text.shape[0], :text.shape[1]] += text
return screen.clip(0, 1)
def barometer(offset, reading):
p = (max(950, min(1050, reading.pressure)) - 950) / 100 * 64
screen = array([
Color('green') if i < int(p) else
Color('green') * Green(p - int(p)) if i < p else
Color('black')
for i in range(64)
])
screen = np.flipud(screen)
text = image_to_rgb(draw_text(int(round(reading.pressure)),
'small.pil', foreground=Color('gray'),
padding=(0, 0, 8, 3)))
screen[:text.shape[0], :] += text[:, offset:offset + 8]
return screen.clip(0, 1)
def bounce(it):
# bounce('ABC') --> A B C C B A A B C ...
return cycle(chain(it, reversed(it)))
def create_database(database):
try:
rrdtool.create(
database, # Filename of the database
'--no-overwrite', # Don't overwrite the file if it exists
'--step', '5s', # Data will be fed at least every 5 seconds
'DS:temperature:GAUGE:1m:-70:70', # Primary store for temperatures
'DS:humidity:GAUGE:1m:0:100', # Primary store for humidities
'DS:pressure:GAUGE:1m:900:1100', # Primary store for pressures
'RRA:AVERAGE:0.5:5s:1d', # Keep 1 day's worth of full-res data
'RRA:AVERAGE:0.5:5m:1M', # Keep 1 month of 5-minute-res data
'RRA:AVERAGE:0.5:1h:1y', # Keep 1 year of hourly data
'RRA:MIN:0.5:1h:1y', # ... including minimums
'RRA:MAX:0.5:1h:1y', # ... and maximums
'RRA:AVERAGE:0.5:1d:10y', # Keep 10 years of daily data
'RRA:MIN:0.5:1d:10y', # ... including minimums
'RRA:MAX:0.5:1d:10y', # ... and maximums
)
except rrdtool.OperationalError:
pass # file exists; ignore the error
def update_database(database, reading):
data = 'N:{r.temperature}:{r.humidity}:{r.pressure}'.format(r=reading)
rrdtool.update(database, data)
def switcher(events, readings, database='environ.rrd'):
create_database(database)
screens = {
(thermometer, 'right'): hygrometer,
(hygrometer, 'left'): thermometer,
(hygrometer, 'right'): barometer,
(barometer, 'left'): hygrometer,
}
screen = thermometer
last_update = None
for event, offset, reading in zip(events, bounce(range(8)), readings):
anim = 'draw'
if event is not None and event.pressed:
try:
screen = screens[screen, event.direction]
anim = event.direction
except KeyError:
yield array(Color('black')), 'fade'
break
now = time()
if last_update is None or now - last_update > 5:
# Only update the database every 5 seconds
last_update = now
update_database(database, reading)
yield screen(offset, reading), anim
sleep(0.2)
def main():
with SenseHAT() as hat:
hat.stick.stream = True
for a, anim in switcher(hat.stick, hat.environ):
if anim == 'fade':
hat.screen.fade_to(a, duration=0.5)
elif anim == 'right':
hat.screen.slide_to(a, direction='left', duration=0.5)
elif anim == 'left':
hat.screen.slide_to(a, direction='right', duration=0.5)
else:
hat.screen.array = a
if __name__ == '__main__':
main()
Exercise
At the moment, it’s too easy to accidentally exit the script. Can you make the application rotate around the screens (i.e. moving right from the barometer screen takes the user back to the thermometer screen, and vice-versa) and pressing the joystick is required to exit the application?
Finally, let’s whip up a little web-server that we can run alongside the Sense HAT script to allow remote clients to query our environmental data and see some pretty graphs of the history:
import rrdtool
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from threading import Lock
from pathlib import PurePosixPath
class SensorData():
def __init__(self, db):
self._db = db
self._data = rrdtool.lastupdate(db)
self._images = {}
@property
def date(self):
return self._data['date']
def __format__(self, format_spec):
element, units = format_spec.split(':')
template = """
<div class="sensor">
<h2>{title}</h2>
<span class="reading">{current:.1f}{units}</span>
<img class="recent" src="{element}_recent.svg" />
<img class="history" src="{element}_history.svg" />
</div>
"""
return template.format(
element=element,
title=element.title(),
units=units,
current=self._data['ds'][element])
def image(self, path):
try:
image = self._images[path]
except KeyError:
# generate it
p = PurePosixPath(path)
try:
element, duration = p.stem.split('_', 1)
except ValueError:
raise KeyError(path)
start = {
'recent': '1d',
'history': '1M',
}[duration]
color = {
'temperature': '#FF0000',
'humidity': '#0000FF',
'pressure': '#00FF00',
}[element]
self._images[path] = image = rrdtool.graphv(
'-',
'--imgformat', 'SVG',
'--border', '0',
'--color', 'BACK#00000000', # transparent
'--start', 'now-' + start,
'--end', 'now',
'DEF:v={db}:{element}:AVERAGE'.format(db=self._db, element=element),
'LINE2:v{color}'.format(color=color)
)['image']
return image
class RequestHandler(BaseHTTPRequestHandler):
database = 'environ.rrd'
data = None
index_template = """
<html>
<head>
<title>Sense HAT Environment Sensors</title>
<link href="https://fonts.googleapis.com/css?family=Raleway" rel="stylesheet">
<style>{style_sheet}</style>
</head>
<body>
<h1>Sense HAT Environment Sensors</h1>
<div id="timestamp">{data.date:%A, %d %b %Y %H:%M:%S}</div>
{data:temperature:°C}
{data:humidity:%RH}
{data:pressure:mbar}
<script>
setTimeout(() => location.reload(true), 10000);
</script>
</body>
</html>
"""
style_sheet = """
body {
font-family: "Raleway", sans-serif;
max-width: 700px;
margin: 1em auto;
}
h1 { text-align: center; }
div {
padding: 8px;
margin: 1em 0;
border-radius: 8px;
}
div#timestamp {
font-size: 16pt;
background-color: #bbf;
text-align: center;
}
div.sensor { background-color: #ddd; }
div.sensor h2 {
font-size: 20pt;
margin-top: 0;
padding-top: 0;
float: left;
}
span.reading {
font-size: 20pt;
float: right;
background-color: #ccc;
border-radius: 8px;
box-shadow: inset 0 0 4px black;
padding: 4px 8px;
}
"""
def get_sensor_data(self):
# Keep a copy of the latest SensorData around for efficiency
old_data = RequestHandler.data
new_data = SensorData(RequestHandler.database)
if old_data is None or new_data.date > old_data.date:
RequestHandler.data = new_data
return RequestHandler.data
def do_HEAD(self):
self.do_GET()
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
data = self.get_sensor_data()
content = RequestHandler.index_template.format(
style_sheet=RequestHandler.style_sheet,
data=data).encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Content-Length', len(content))
self.send_header('Last-Modified', self.date_time_string(
data.date.timestamp()))
self.end_headers()
self.wfile.write(content)
elif self.path.endswith('.svg'):
data = self.get_sensor_data()
try:
content = data.image(self.path)
except KeyError:
self.send_error(404)
else:
self.send_response(200)
self.send_header('Content-Type', 'image/svg+xml')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
else:
self.send_error(404)
def main():
httpd = HTTPServer(('', 8000), RequestHandler)
httpd.serve_forever()
if __name__ == '__main__':
main()
Run this alongside the monitor script, make sure your Pi is accessible on your local network and then visit http://your-pis-address-here:8000/ in a web-browser.
Note
We could have added this to the monitor script, but frankly there’s no point as rrdtool includes all the locking we need to have something reading the database while something else writes to it. This also ensures that a bug in one script doesn’t affect the operation of the other, and means web requests are far less likely to affect the operation of the Sense HAT interface.
4.6. Auto-start¶
This is the sort of application it would be nice to start automatically upon
boot up. Thankfully, this is easy to arrange with a few systemd files.
Create the following under /etc/systemd/system/monitor_app.service
:
[Unit]
Description=An environment monitoring application
After=local-fs.target
[Service]
ExecStart=/usr/bin/python3 /home/pi/monitor_final.py
WorkingDirectory=/home/pi
User=pi
[Install]
WantedBy=multi-user.target
Note
You’ll need to modify the path for ExecStart
to point to the location
of your monitor_final.py
script. You may want to modify
WorkingDirectory
too if you want the database to be stored in another
location.
Then for the web-service (if you want it), create the following under
/etc/systemd/system/monitor_web.service
:
[Unit]
Description=Web server for the environment monitoring application
After=local-fs.target network.target
[Service]
ExecStart=/usr/bin/python3 /home/pi/monitor_server.py
WorkingDirectory=/home/pi
User=pi
[Install]
WantedBy=multi-user.target
Note
Remember to modify ExecStart
(and optionally WorkingDirectory
) as
above.
Finally, inform systemd of the changes and tell it we want to start these new services on boot-up as follows. For example, the following commands might be used to achieve all of this:
$ cd /home/pi
$ nano monitor_app.service
$ nano monitor_web.service
$ sudo cp monitor_*.service /etc/systemd/system/
$ sudo systemctl daemon-reload
$ sudo systemctl enable monitor_app
$ sudo systemctl enable monitor_web
To start the services immediately:
$ sudo systemctl start monitor_app
$ sudo systemctl start monitor_web
To stop the services immediately:
$ sudo systemctl stop monitor_app
$ sudo systemctl stop monitor_web
If you want to disable these from starting at boot time you can simply run the following commands:
$ sudo systemctl disable monitor_app
$ sudo systemctl disable monitor_web
Naturally, you could disable the web service but leave the main application running too.