Blog der Heimetli Software AG

Decoder für den TFA 30.3222.02

Der TFA 30.3222.02 ist ein kombinierter Sensor der die Temperatur, die Luftfeuchte und die Windgeschwindigkeit misst. Er sendet auf 433MHz, mit einem Protokoll das bei rtl_433 dokumentiert ist.

Sensor für Temperatur, Feuchtigkeit und Wind

Bei mir hat der Empfang mit rtl_433 nicht recht geklappt. Ab und zu erkannte er das Signal und decodierte es auch, aber meistens meinte das Programm "no clue". Möglicherweise liegt das am DVB-Stick, denn er schien übersteuert zu sein. Mit der manuellen Einstellung des Empfangspegels ging es etwas besser.

Interessanterweise hat der Decoder einen weiteren Sensor mit diesem Protokoll entdeckt. Wo der genau steht habe ich bisher nicht herausgefunden.

Aufbau des Empfängers

Der Decoder läuft auf einem Raspberry Pico mit einem Empfängermodul für 433MHz. Der Aufbau ist hier beschrieben.

Dieser Empfänger hängt an der seriellen Schnittstelle des Webservers. Der Pico extrahiert das Signal des Sensors aus all den Meldungen die auf diesem Band gesendet werden. Um den Decoder möglichst schnell zu machen wird die CRC erst auf dem PC geprüft.

Der Code auf dem Pico

""" Decodes the signal from the tfa 30.3222.02 sensor

Setup:
    GPIO 16 is connected to tho output pin of an AM-RX12E-433P

    The output is sent to the serial port on USB.
"""

import time
from machine import Pin

MIN_SHORT = 180
MAX_SHORT = 325
MIN_LONG  = 410
MAX_LONG  = 540

p = Pin( 16, Pin.IN )

def measure_pulse( ts, level ):
    """ Measures the pulse length """
    while p.value() == level:
        pass

    te = time.ticks_us()

    return te - ts, te

def wait_for_start():
    """ Waits for the start sequence """

    while True:
        # Wait for the first pulse
        diff, te = measure_pulse( time.ticks_us(), 0 )

        if diff < 700 or diff > 750:
            continue

        # Ensure that it is followed by a correct pulse
        diff, te = measure_pulse( te, 1 )

        if diff < 700 or diff > 750:
            continue

        while True:
            # Wait for the next pulse
            diff, te = measure_pulse( time.ticks_us(), 0 )

            if diff < 700 or diff > 750:
                break

            # Look out for the data bit
            diff, te = measure_pulse( te, 1 )

            if MIN_SHORT < diff < MAX_SHORT:
                return 0, te 

            if MIN_LONG < diff < MAX_LONG:
                return 1, te 

            if diff < 700 or diff > 750:
                break

def tfa():
    """ Decodes the signal and writes it to the standard output """

    while True:
        signal, te = wait_for_start()

        if signal == 0:
            min = MIN_LONG
            max = MAX_LONG
        else:
            min = MIN_SHORT
            max = MAX_SHORT

        diff, te = measure_pulse( te, 0 )

        if diff < min or diff > max:
            continue

        bits = 1
        while bits < 64:
            diff, te = measure_pulse( te, 1 )

            if MIN_SHORT < diff < MAX_SHORT:
                signal = signal << 1
                min    = MIN_LONG
                max    = MAX_LONG
            elif MIN_LONG < diff < MAX_LONG:
                signal = signal << 1 | 1
                min    = MIN_SHORT
                max    = MAX_SHORT
            else:
                print( "diff start", diff )
                break

            diff, te = measure_pulse( te, 0 )

            if diff < min or diff > max:
                print( "diff end", diff )
                break

            bits += 1

        # Ensure that all bits were received         
        if bits == 64:
            print( hex(signal) )

# Start the decoder
tfa()

Der Sensor codiert jedes Bit in einer Kombination von je einem kurzen und einem langen Pegel unterschiedlicher Länge. Auf einen kurzen Puls muss eine lange Pause folgen und umgekehrt. min und max definieren die Länge der erwarteten Pause zwischen den Pulsen.

Das Script auf dem Server

Dieses Programm prüft das empfangene Signal und decodiert die Messwerte. Der Sensor schickt zwei verschiedene Frames, einen für Temperatur und Feuchtigkeit und einen für die Windgeschwindigkeit.

Der Sensor sendet in unregelmässigen Abständen, aber viel zu oft für eine vernünftige Auswertung. Deshalb werden die Mittelwerte der Messungen nur alle 5 Minuten ins CSV geschrieben.

Im Logfile gibt es Messwerte mit einem Abstand von bis zu 8 Minuten. Das könnte am Sensor liegen oder auch daran dass nicht jedes Signal vom Sensor korrekt empfangen wird. Für meine Zwecke ist das aber genau genug, deshalb bin ich dem Effekt nicht nachgegangen.

from datetime import datetime, timedelta

class Sensor:
    dt = timedelta( minutes=5 ) 

    def __init__( self, name ):
        self.name     = name
        self.log      = datetime.now()
        self.sum      = 0
        self.count    = 0
        self.filename = f"{name}.csv"

    def update( self, timestamp, value ):
        self.sum   += value
        self.count += 1

        if self.log + self.dt < timestamp:
            write_log( timestamp, self.filename, self.sum / self.count )

            self.log   = timestamp
            self.sum   = 0
            self.count = 0

class State:
       temperature = Sensor( "temperature" )
       humidity    = Sensor( "humidity" )
       wind        = Sensor( "wind" )

def CRC8( value ):
    mask =     1 << 63
    poly = 0x131 << 55

    while mask > 255:
        if value & mask != 0:
            value ^= poly

        mask >>= 1
        poly >>= 1

    return value

def write_log( now, filename, value ):
    timestamp = now.strftime( "%Y-%m-%dT%H:%M:%S" )

    with open(filename,"a") as logfile:
        logfile.write( f"{timestamp},{value:.2f}\n" )


def process_frame( line ):
    print( line, end="" )

    # Process only the frames of my sensor
    if "0x9" in line:
        value = int( line, 16 )

        if CRC8(value) == 0:
            t = (value >> 32) & 0x0F

            now = datetime.now()

            if t == 1:
                temperature = (((value >> 20) & 0x0FFF) - 500) / 10
                humidity = (value >> 8) & 0x0FFF
                print( "temperature", temperature, "humidity", humidity )

                State.temperature.update( now, temperature )
                State.humidity.update( now, humidity )

            if t == 2:
                wind = ((value >> 20) & 0x0FFF) / 10
                print( "wind", wind )

                State.wind.update( now, wind )

with open( "/dev/ttyACM1" ) as input:
    for line in input:
        process_frame( line )

Die Messwerte

Werden hier visualisiert: https://blog.heimetli.ch/temperature-humidity-wind.html