Client
Time for the other half…
This is going to be a lot, not gonna lie this is going to go pretty deep.
We need to write code that does the following:
- Renders the UI
- Handles the serial port
- Connects the UI and serial together
This may seem trivial at first, but you must realize that Python - by default - is single threaded. This means if we make a Button that sends a message over serial, by default the UI will be frozen until that serial code is done executing.
This is bad, we don’t like this, so we will need to take care to design our app with concurrency in mind.
UI
Let’s start easy and just set up a simple UI.
We will be using Tkinter for this app.
We start by creating the object that represents our Tkinter session:
import tkinter as tk
import tkinter.ttk as ttk
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("LED Blinker")
if __name__ == '__main__':
app = App()
app.mainloop()
Ok this is pretty neat, when you run this program you should see an empty window appear.
Let’s add our UI elements (called “widgets” in Tkinter):
def __init__(self):
super().__init__()
self.title("LED Blinker")
ttk.Checkbutton(self, text='Toggle LED').pack()
ttk.Button(self, text='Send Invalid').pack()
ttk.Button(self, text='Disconnect', default='active').pack()
What a nice simple app.
We have:
- a checkbox to toggle the LED
- a button to test sending an invalid byte
- a disconnect button
Backend
We now need to implement the missing backend components.
How do we run code when the checkbox is checked or unchecked?
Well, ttk.CheckButton
has two more kwargs: variable
and command
.
It will set the passed variable
to the state of the checkbox on change, and call the passed command
on change.
We need to create both of those though, let’s start with the variable
:
self.led = tk.BooleanVar()
ttk.Checkbutton(self, text='Toggle LED', variable=self.led, command=self.update_led).pack()
ttk.Button(self, text='Send Invalid').pack()
ttk.Button(self, text='Disconnect', default='active').pack()
We create a member variable led
that represents the state of the checkbox. It is a special Tkinter variable type that can be mutated and observed by Tkinter widgets.
led
to be a member variable, so we can access it throughout the lifetime of our App
instance, even after the constructor finishes.We also need to create the function self.update_led()
:
def update_led(self):
value = self.led.get()
# send `value` somehow??
Hmm… so how can we send our message?
We need to have access to the serial port in our app. Well, just like led
, we can make a member variable of type Serial
.
We can change our App
class:
class App(tk.Tk):
ser: Serial
def __init__(self):
...
Oh… but, how do we connect to the serial device? We can’t just hardcode /your/port
because it changes all the time.
We need to create a menu the user can select their port from and then connect.
Let’s make another class called SerialPortal
that provides this functionality:
from serial import Serial
from serial.tools.list_ports import comports
class SerialPortal(tk.Toplevel):
def __init__(self, parent: App):
super().__init__(parent)
self.parent = parent
self.parent.withdraw() # hide App until connected
ttk.OptionMenu(self, parent.port, '', *[d.device for d in comports()]).pack()
ttk.Button(self, text='Connect', command=self.connect, default='active').pack()
def connect(self):
self.parent.connect()
self.destroy()
self.parent.deiconify() # reveal App
…and add the new popup window, a connect
function and port
variable to App
:
class App(tk.Tk):
ser: Serial
def __init__(self):
super().__init__()
self.title("LED Blinker")
self.port = tk.StringVar() # add this
self.led = tk.BooleanVar()
ttk.Checkbutton(self, text='Toggle LED', variable=self.led, command=self.update_led).pack()
ttk.Button(self, text='Send Invalid').pack()
ttk.Button(self, text='Disconnect', default='active').pack()
SerialPortal(self) # and this
# and finally this
def connect(self):
self.ser = Serial(self.port.get())
def update_led(self):
value = self.led.get()
# send `value` somehow??
Ok cool, now we can fill in update_led
:
def update_led(self):
self.ser.write(bytes([self.led.get()]))
At this point, checking the LED checkbox should toggle the LED!
But there is still more to do…
Let’s finish the backend for the Send Invalid and Disconnect buttons:
def disconnect(self):
self.ser.close()
SerialPortal(self) # display portal to reconnect
def send_invalid(self):
self.ser.write(bytes([0x10]))
This is pretty great, but we haven’t used the response from the DevBoard yet, you’ll notice clicking the Send Invalid button does nothing.
We should standardize writing to the serial port by making one write
function that all callbacks use.
First add this import for displaying an alert:
from tkinter.messagebox import showerror
Then define the same constants as in our firmware on the DevBoard:
S_OK: int = 0xaa
S_ERR: int = 0xff
And finally add the write
function to App
:
def write(self, b: bytes):
try:
self.ser.write(b)
if int.from_bytes(self.ser.read(), 'big') == S_ERR:
showerror('Device Error', 'The device reported an invalid command.')
except SerialException:
showerror('Serial Error', 'Write failed.')
Now we can change our callbacks that wrote to the serial port to use this:
def update_led(self):
self.write(bytes([self.led.get()]))
def send_invalid(self):
self.write(bytes([0x10]))
And now these will properly handle an S_ERR
response.
Safe Resource Acquisition
Another thing we need to do is make sure our serial port is properly closed when our app exits.
We can achieve this by allowing App
to be constructed in a managed context.
This is what the with keyword is for in Python.
To add this support to App
, we add these functions:
def __enter__(self):
return self
def __exit__(self, *_):
self.disconnect()
And change our main code to guarantee resource release:
if __name__ == '__main__':
with App() as app:
app.mainloop()
Threading
Right now, if any serial code gets stuck or just takes a long time, our UI will freeze for that time.
To avoid this, we need to make sure all registered callbacks spawn as a detached thread.
We can make a helper decorator that we can use to mark any functions we want to be spawned in a detached thread:
from threading import Thread, Lock # we'll use Lock later ;)
def detached_callback(f):
return lambda *args, **kwargs: Thread(target=f, args=args, kwargs=kwargs).start()
Don’t worry too hard about understanding this decorator, just know it coerces the function it’s applied to into being spawned into a thread upon invocation.
This decorator can be applied to our callbacks like so:
@detached_callback
def update_led(self):
self.write(bytes([self.led.get()]))
We have one final problem to solve…
It is now possible that multiple threads could try to use the serial port at once, this would cause undefined behavior in the form of a race condition.
To solve this, we need to wrap the Serial
object in a lock.
We can do this by creating a new type, LockedSerial
:
class LockedSerial(Serial):
_lock: Lock = Lock()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def read(self, size=1) -> bytes:
with self._lock:
return super().read(size)
def write(self, b: bytes, /) -> int | None:
with self._lock:
super().write(b)
def close(self):
with self._lock:
super().close()
Our custom type inherits from Serial
and overrides the member functions we use with a lock acquisition of the super’s implementation.
Effectively, our type behaves exactly like the Serial
type but with a lock around every function call.
Now replace every use of the Serial
type with LockedSerial
.