Blog der Heimetli Software AG

Regensender TFA 30.3161 decodieren

Dieser Sender ist ein Teil der TFA Monsun Wetterstation. Er misst die Temperatur und die Regenmenge.

Regensensor der Monsun-Station

Der Regenmesser ist als "Tipping Bucket Rain Gauge" realisiert. Das bedeutet dass ein Behälter so lange mit Wasser gefüllt wird bis er kippt. Der Sensor zählt die Kippvorgänge und meldet sie an den Bedienteil der Station.

Tipping Bucket im Regensensor

Die Hardware

Die Hardware ist ähnlich aufgebaut wie im Post zum Prologue-Decoder beschrieben, nur habe ich die Module auf eine Lochrasterplatte gelötet. Das ist sehr viel stabiler, sowohl mechanisch wie auch empfangstechnisch.

Der Code für den Pico

Hier wird nur kontrolliert dass das Timing und die Anzahl der Pulse stimmen. Wenn beides passt wird das empfangene Signal über USB an den PC geschickt.

""" Decodes the signal from the tfa 30.3222.02 sensor

Setup:
    GPIO 16 is connected to tho output pin of an AM-RX12E-433P

    The output is sent to the serial port on USB.
"""

import time
from machine import Pin

# Limits for the start pulse
MIN_START = 8500
MAX_START = 9600

# Limits for the separator
MIN_SEP   = 400
MAX_SEP   = 480

# Limits for the short pulse
MIN_SHORT = 1960
MAX_SHORT = 2200

# Limits for the long pulse
MIN_LONG  = 3900
MAX_LONG  = 4250

p = Pin( 16, Pin.IN )

def measure_pulse( ts, level ):
    """ Measures the pulse length """
    while p.value() == level:
        pass

    te = time.ticks_us()

    return te - ts, te

def wait_for_start():
    """ Waits for the start sequence """

    while True:
        # Wait for the first pulse
        diff, te = measure_pulse( time.ticks_us(), 0 )

        if diff > MIN_START and diff < MAX_START:
            return te

       
def rain():
    """ Decodes the signal and writes it to the standard output """

    while True:
        te = wait_for_start()

        signal = 0

        bits   = 0
        while bits < 36:
            diff, te = measure_pulse( te, 1 )

            if diff < MIN_SEP or diff > MAX_SEP:
                break

            diff, te = measure_pulse( te, 0 )
            
            if diff > MIN_SHORT and diff < MAX_SHORT:
                signal <<= 1
            elif diff > MIN_LONG and diff < MAX_LONG:
                signal = (signal << 1) | 1
            else:
                break

            bits += 1

        if bits == 36:
            print( hex(signal) )           

# Start the decoder
rain()

Der Logger auf dem PC

Wie oben beschrieben bekommt der Logger nur das empfangene Signal. Die Prüfung der Checksumme und das Extrahieren der Daten passiert auf dem PC.

Der Sensor schickt mehrere gleiche Signale die vom Pico auch empfangen une weitergegeben werden. Der Logger unterdrückt gleiche Signale die kurz aufeinander folgen.

Da bei mir schon genug Temperatursensoren laufen wird die Temperatur einfach mitgeloggt. Da die Temperatur in der letzten Zeit nie unter 0 Grad gesunken ist weiss ich nicht was der Sensor dabei schickt. Wahrscheinlich sollten Werte über 2048 als negativ betrachtet werden.

from datetime import datetime, timedelta

def reverse( n ):
    return ((n & 1) << 3) | ((n & 2) << 1) | ((n & 4) >> 1) | ((n & 8) >> 3)

def valid( signal ):
    sum = 0
    for shift in range( 32, 0, -4):
        sum += reverse( (signal >> shift) & 0xF )

    return (sum & 0xF) == reverse(signal & 0xF)

def write_log( now, filename, temperature, rain ):
    timestamp = now.strftime( "%Y-%m-%dT%H:%M:%S" )

    with open(filename,"a") as log:
        log.write( f"{timestamp},{temperature},{rain}\n" )

def decode_signal( line, now ):
    print( line.strip() )
    signal = int( line, 16 )

    if valid( signal ):
        temperature = (reverse((signal >> 12) & 0xF) << 8) | (reverse((signal >> 16) & 0xF) << 4) | reverse((signal >> 20) & 0xF)
        rain        = ((reverse((signal >> 28) & 0xF) & 0xC) << 8) | ((reverse((signal >> 24) & 0xF) & 0x6) << 7) | (reverse((signal >> 4) & 0xF) << 4) | reverse((signal >> 8) & 0xF)
        write_log( now, "rain.log", temperature / 10, rain )
        print( temperature / 10, rain )


def main():
    delta     = timedelta( seconds=20 )
    previous  = ""
    next      = datetime.now()

    with open( "/dev/ttyACM0" ) as input:
        for line in input:

            now = datetime.now()
            if (line != previous) or (now > next):
                decode_signal( line, now )
                previous = line
                next     = now + delta


if __name__ == "__main__":
   main()