The Concurrency Choreographer: Making BLE’s Async Data Sync with OBS

posted Originally published at kitfucoda.medium.com 12 min read

“What can we do with asynchronous programming?”, This was one of the questions I received in my talk on asynchronous programming that I delivered in Pycon MY. While I briefly mentioned building a chatbot that day. I believe the project we are discussing today should resonate more. During my spare time (however rare it is these days), I stream my gameplay session to document my journey. One of the widgets in my streaming session is a heart rate monitor, and it has been broken for some time.

Cute cartoon on the topic generated by Gemini

Seeking the Open Standard: From Fitbit’s Lock-in to Garmin’s BLE

I was doing some spring-cleaning for my OBS setup, and one component caught my eye — the heart rate monitor. For a time, it was a novel way to convey a streamer’s psychological state in a numerical form. Coincidentally, I was already wearing a fitness tracker: Fitbit Versa 3 and I decided to give it a go. Fortunately, there was a way to capture heart rate through the somewhat limited proprietary API, and have it streamed to my computer. Problem is, the fitness (not so) smartwatch died.

Funnily enough, a family member bought an almost identical Fitbit Sense a few months earlier and her watch is still functioning as of now. In a way the watch died rather prematurely. On the other hand, I was out of a job, and yet that didn’t translate to more gaming time as I was juggling with multiple things at the time. Consequently, the replacement only came much later, not long after I started my current job. Instead of getting another Fitbit, I moved on to a Garmin Instinct II, a tracker my peers highly recommended.

Unlike my past Fitbit devices, the Garmin watch does broadcast my heart rate via a standard Bluetooth interface. This makes replacing the workflow a much easier job. For that to work, I just needed to figure out how to get the data, and then supply that data to OBS via the WebSocket interface. The library in use is bleak to communicate with the watch via Bluetooth LE, and obsws-python to publish heart rate to OBS.

Previously, our chatbot project only dealt with asynchronous libraries. Yet, this time the library that communicates with OBS is synchronous. Therefore, we are taking this opportunity to discuss how to coordinate them to work with each other. Specifically, we are solving the problem where the producer (the heart rate monitor data) is asynchronous, and the consumer (the sending of heart rate to OBS) is synchronous.

Building the Async Producer: Connecting via Bluetooth Low Energy

My journey on tracking my fitness data started with reading about running. One of the books I read was titled Body, Mind and Sport by John Douillard. Some of the practices advocated may seem controversial, though the idea of being mindful about my physical state stuck in my mind. As a result, I started running with deep breaths, and soon noticed running could be made bearable or even enjoyable with some effort. At the same time, I went looking for a chest belt to pair with my fitness tracker installed on my phone.

That was before wearable tech went mainstream, so my choices were rather limited. Chest straps to measure heart rate were common among runners, but they weren’t built to work with Bluetooth. I had to search high and low to finally find one that would work with my smartphone application. It was slightly more expensive than an ordinary tracker, but it was worth it. I ran with it for a few years, and benefited from having my current heart rate displayed on my tracker in real time.

No more estimating by poking my fingers to my neck and doing multiplication while I was busy catching my breath.

Photo by Luiz Rogério Nunes on Unsplash

When wearable technology started to take off, Sony later released their smart band. The companion app also provided some limited insight on how my body reacts to activities in my day-to-day life. Conveniently, it also broadcasts my heart rate through Bluetooth and switching to it in my sports tracker took almost no effort. No more having to prepare my chest strap by wetting it before a run — that was a definite win.

Enough storytelling for now, let’s shift our focus back to the project. Like my old Sony smart band I bought second-hand, my Garmin Instinct 2 also broadcasts my heart rate on Bluetooth on demand. A quick search yielded blehrm, and it provides an abstraction to bleak. Unfortunately, it wasn’t the right library for the purpose, and I soon moved on to bleak directly after some failed attempts.

Firstly, we start by finding our device. Call the BleakScanner.discover method with a reasonable timeout, as shown below:

from bleak import BleakScanner

ble_devices = await BleakScanner.discover(timeout=10.0)
print(f"--- Discovered {len(ble_devices)} BLE Devices ---")
for i, device in enumerate(ble_devices):
print(
f"#{i}: Name: {device.name}, Address: {device.address}, Details: {device}"
)

Running that will return (device addresses are replaced with the placeholder

):

--- Discovered 8 BLE Devices ---
#0: Name: Instinct 2, Address:

, Details:
: Instinct 2
#1: Name: None, Address:
, Details:
: None
#2: Name: None, Address:
, Details:
: None
#3: Name: U-WTD838C, Address:
, Details:
: U-WTD838C
#4: Name: None, Address:
, Details:
: None
#5: Name: None, Address:
, Details:
: None
#6: Name: None, Address:
, Details:
: None
#7: Name: None, Address:
, Details:
: None

Jot down the address, as we will need it for the next step: establishing a connection to the device.

from bleak import BleakClient

HR_MEASUREMENT_CHARACTERISTIC_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
async with BleakClient(

,
timeout=15.0
) as client:
await client.start_notify(
HR_MEASUREMENT_CHARACTERISTIC_UUID,
heart_rate_handler,
)

In the snippet, we are sending the address we collected earlier to BleakClient with a reasonable timeout. Then we subscribe to the heart rate measurement event, and delegate the handling to a function.

With the help of Gemini, we saved time digging through the documentation, as it returned a ready-made snippet for the BLE connection and data parsing, which is shown below.

def heart_rate_handler(sender, data):
"""Callback to handle heart rate measurement data."""
# The heart rate data structure is defined in the BLE spec:
# Byte 0: Flags (contains format of HR value and if other fields are present)
# Byte 1: Heart Rate Value (8-bit or 16-bit)

# Check the first bit of the flags to see if HR is 16-bit (0x01) or 8-bit (0x00)
flags = data[0]
is_16_bit = flags & 0x01
if is_16_bit:
# 16-bit HR value is at index 1 and 2
hr_value = unpack("<H", data[1:3])[0]
else:
# 8-bit HR value is at index 1
hr_value = data[1]
logger.info(f"[{flags:04b}] Heart Rate: {hr_value} BPM")

There we have it, we have half the program done, and it is able to fetch heart rate data periodically from the device.

Bridging the Gap: The Producer-Consumer Pattern

Photo by Lenzil Gonsalves on Unsplash

Continuing the story after I bought my second-hand Sony smart band. Electronics products die inevitably, as Sony wasn’t making a viable successor for the product I had to look elsewhere. That was when I first looked at Fitbit, particularly a Fitbit Charge 3. It was more than doubled the price of my smart band, though it came with more features. One feature I particularly liked was the sleep tracker, as I have really bad sleep pattern. It was working fine, though with a major drawback — I lost the ability to read my heart rate through the sports tracker app.

Since the device came with a screen itself, it did mean there was no need to whip out my phone constantly to check my heart rate. Not being able to log my run with heart rate data attached created a minor annoyance. On the other hand, I moved on to other sports already, namely boxing and weightlifting. The shift in activity did change the need as I didn’t need the data as often. For instance, the short-burst nature of weightlifting did not require constant monitoring of my heart rate; the main focus was just on whether I could lift the dumbbell or otherwise.

Again, the tracker died after serving for several years. I moved on to a Versa 3 on a good deal, despite the mild annoyance. Other brands were considered, but I got confused with the numerous lines of products and decided to stay with Fitbit for another time. Being a smart watch, Fitbit did release an SDK for their Versa watches, but they discontinued application development support when they released the successor, Versa 4. Regrettably, I didn’t put in the time and effort to develop anything as I was adequately happy with it.

As mentioned earlier, I found a way to broadcast my heart rate with an app offered in the Fitbit app store, and that led to the setup of a heart rate widget on my OBS setup. Coincidentally, I acquired Tetris Effect on my workstation, so I was curious to see if my heart rate changed throughout the game play. Surprisingly, it really did, and the recording clearly showed the heart rate fluctuations in action.

https://medium.com/media/018de8b0f30246afd93165f500a77b62/href</a>

Granted, as the tracker was worn on the wrist, the response was somewhat delayed. Despite that, the stream showed as the tetrimino dropped faster, my heart rate also increased correspondingly. More often than not, the height of dropped blocks waiting to be eliminated also played a role in influencing the heart rate.

Now that we have the stream of heart rate data, how do we recreate the widget in OBS through our script?

Open Broadcasting Studio, or OBS in short, offers a way to interact with the sources on screen through the websocket interface. In our case, we are interested in sending our heart rate data to a text source. By using the obsws-python library, we can send the data as soon as we capture it from the device. We can establish this connection to OBS using the following simple code:

from obsws_python import ReqClient

obs = ReqClient(
host=,
port=,
password=,
)

All of these settings can be found in the OBS websocket setup page, as shown in the screenshot below:

Sending heart rate to the source is simple, through the obs.set_input_settings method

obs.set_input_settings(
,
{“text”: f”{item:3d}”},
True,
)

By assembling all these together we would have a complete program for the job. The only problem with putting everything together is that we are mixing asynchronous code calling bleak with a strictly synchronous and likely non thread-safe code by obsws-python. Fortunately, the data only flows one-way; otherwise it complicates the whole setup significantly more. Ideally the library that dealt with OBS websocket could be written in a more asynchronous manner but obsws-python was the only reliable library that I could find in a weekend.

The project started as a weekend project, and I stopped right after achieving a running proof-of-concept. Only later, when I finally had the time, did I start refactoring this into a more reliable program that properly handles errors and graceful shutdown. Considering we discussed rather extensively in the past on those topics, let’s skip to how we turn this into a producer-consumer design.

with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(producer)
executor.submit(consumer)

We want both the consumer and producer to remain isolated from each other, so we separate them into individual threads. Usually it is better to split them into different processes, but since we are only running one event loop for the producer part it is okay to stick with threads. In order to synchronize both of them, we need to pass a queue and an exit event to them as well. This can be achieved with functools.partial, or by writing a class with the dunder __call__ method, a topic we discussed previously. I will leave the implementation detail out to focus on the core concept itself.

https://kitfucoda.medium.com/the-versatility-of-call-a-python-developers-secret-weapon-a6bff776971a

Now, back to our producer, where we collect the heart rate, instead of printing the heart rate, we dump it to our queue:

logger.info(f”[{flags:04b}] Heart Rate: {hr_value} BPM”)
asyncio.create_task(asyncio.to_thread(queue.put, hr_value))

In case you missed our series of articles on AsyncIO (go read them, they are awesome!), queue.put is a blocking operation, so we are delegating it to asyncio.to_thread. For our purpose, we don’t exactly care about the outcome, but we want to ensure it is scheduled safely. Therefore, we pass the returned coroutine object to asyncio.create_task.

On the other hand, as the code dealing with OBS is synchronous, we need to plan carefully.

def consumer(queue: Queue, exit_event: Event) -> None:
obs = ReqClient(
host=,
port=,
password=,
)

logger.info("Starting consumption loop")
while True:
# task 1: handle the exit event
if exit_event.is_set():
logger.info("Exiting consumption loop")
break

# task 2: publish to obs
with suppress(Empty):
if item := queue.get(timeout=5.0):
obs.set_input_settings(
,
{"text": f"{item:3d}"},
True,
)

This simplified snippet sets up an infinite loop that does two things, firstly it checks if the exit_event is triggered, and secondly, it consumes the heart rate data from the queue and sends it to OBS. We would need to set a timeout and suppress the potential Empty exception when getting a value from the queue. The reason for this is otherwise the code will block until it receives a data, and we would never be able to capture exit_event that was emitted elsewhere.

Asyncio’s Role in Real-World, Non-Web I/O

Essentially, this is a fairly common Consumer-Producer application between two different components. While the consumer is depending on the producer to publish to OBS, the producer should not have to wait for it to complete. By separating them into different threads and using a queue for synchronization, we allowed both components to work independently. Unlike the telegram chatbot we built previously, this shows how we can adapt synchronous code into an asynchronous environment.

I appreciate the fact that Garmin allows the heart rate data to be made available through an open interface. If I were to get a new Versa watch, this project would not have been possible as the data is locked away from a proprietary API. As mentioned earlier, Fitbit also removed support for application development for the newer models.

That would mean no more Tetris Effect streams with heart rate readings for me.

https://medium.com/media/6fd1166a20e17dcd450a99014354e282/href</a>

Not like anyone cared anyway.

So, is this project a good example as a response to the question posed early in the article? Is this a good demonstration of what asynchronous programming is capable of? I would argue yes, as this shows even though not all components are asynchronous, it is still possible to workaround it. When we have multiple components that need to talk to each other, asyncio is definitely a way to go, albeit requiring more thought and work into it.

That’s all I have for today! The code snippets are purposefully simplified for brevity, but if you are interested in the project, feel free to check it out on GitHub. Thanks for reading, I shall see you in writing again soon.

For the sake of transparency, this article was a collaborative effort with my editor, Gemini, who assisted me throughout the writing process. Beyond providing editorial suggestions to tighten the narrative and structure, the model was instrumental in accelerating the technical side of the project, specifically by generating and verifying the initial code snippet required to correctly extract and parse the raw heart rate data from the BLE characteristic — a small but critical step that saved me hours of digging through specification sheets. The final code and the story are mine.

1 Comment

0 votes
0

More Posts

Concurrency vs. Parallelism: Achieving Scalability with ProcessPoolExecutor

kitfu10 - Apr 8

The Pygame Framework I Didn’t Plan: Building Tic-Tac-Toe with Asyncio and Events

kitfu10 - Jul 5

Asynchronous Python: A Beginner’s Guide to asyncio

alvisonhunter - Oct 14

Nexios (ASGI Python framework)

Chidebele Dunamix - Jul 14

AsyncIO Task Management: A Hands-On Scheduler Project

kitfu10 - Apr 1
chevron_left