brandmeister-lastheard-tele.../main.py
2026-06-08 23:23:42 +02:00

184 lines
7.4 KiB
Python

import socketio
import json
import time
import os
import requests
# Read variables from environment
tg_to_watch = int(os.environ["TG_TO_WATCH"])
cluster_to_watch = os.environ["CLUSTER_TO_WATCH"]
telegram_channel = os.environ["TELEGRAM_CHANNEL"]
telegram_api_key = os.environ["TELEGRAM_API_KEY"]
cluster_watch = os.environ.get("CLUSTER_WATCH", "true").lower() == "true"
def _as_int(value):
# The LastHeard stream is inconsistent about whether IDs/slots arrive as
# ints or strings, so normalise everything we compare to int.
try:
return int(value)
except (TypeError, ValueError):
return value
def _parse_pairs(raw):
# "262399:2,262383:2" -> {(262399, 2), (262383, 2)}
pairs = set()
for item in raw.split(","):
item = item.strip()
if not item:
continue
rid, _, slot = item.partition(":")
pairs.add((_as_int(rid), _as_int(slot)))
return pairs
def _parse_ids(raw):
# "262399,262383" -> {262399, 262383}
return {_as_int(item.strip()) for item in raw.split(",") if item.strip()}
# Repeater/slot combinations that should never count as a slot match, and
# repeater IDs whose TG8 traffic should be ignored. Both are optional and
# replace what used to be hardcoded repeater IDs in the event handler.
slot_exclusions = _parse_pairs(os.environ.get("SLOT_EXCLUDE", ""))
tg8_exclude_repeaters = _parse_ids(os.environ.get("TG8_EXCLUDE_REPEATERS", ""))
print(f"TG to watch: {tg_to_watch}")
print(f"Cluster to watch: {cluster_to_watch}")
print(f"Telegram channel: {telegram_channel}")
print(f"Cluster watch: {cluster_watch}")
print(f"Slot exclusions: {slot_exclusions or 'none'}")
print(f"TG8 exclude repeaters: {tg8_exclude_repeaters or 'none'}")
########################################################################
def send_telegram_message(text):
url = f"https://api.telegram.org/bot{telegram_api_key}/sendMessage"
r = requests.post(url, data={"chat_id": telegram_channel, "text": text, "parse_mode": "HTML"})
print(f"Telegram API response: {r.status_code} {r.text}")
# Read the BM cluster database.
# Note: the former endpoint /v2/cluster/byName?name=... no longer exists.
# Instead /v2/cluster returns the full list of all clusters, from which we
# pick the matching one by clusterName.
bm_clusterinfo_uri = "https://api.brandmeister.network/v2/cluster"
# Parse the response so it is easier to work with
response = requests.get(bm_clusterinfo_uri)
bm_clusters_json = response.json()
# Filter out the cluster(s) with a matching name
bm_clustermasters_json = [c for c in bm_clusters_json if c["clusterName"] == cluster_to_watch]
if not bm_clustermasters_json:
print(f"Cluster '{cluster_to_watch}' not found in the Brandmeister API")
# Initialise the lists used while iterating over the cluster members
list_of_repeater = []
repeater_list_with_cluster = []
# First loop: read the members of the cluster
for master in bm_clustermasters_json:
bm_master_repeaters_with_cluster_uri = "https://api.brandmeister.network/v2/cluster/" + str(master["id"]) +"/members"
response_repeater_in_cluster = requests.get(bm_master_repeaters_with_cluster_uri)
try:
repeater_list_with_cluster = (response_repeater_in_cluster.json())[0]["members"]
for repeater in repeater_list_with_cluster:
# Build the list of repeaters that belong to the cluster
list_of_repeater.append (repeater)
except:
print("Error")
# print(f"Repeaters in cluster: {len(list_of_repeater)}")
# for rep in list_of_repeater:
# print(f" {rep['repeaterid']} slot {rep['slot']}")
# SocketIO setup for the MQTT socket handling
sio = socketio.Client(reconnection=True, reconnection_attempts=0, reconnection_delay=5, reconnection_delay_max=60)
@sio.event
def connect():
print('connected to server')
# Since the API change the stream has to be actively subscribed to,
# otherwise the server keeps the connection open but sends no data.
sio.emit("join", "everything")
print('subscribed to LastHeard stream (join/everything)')
@sio.event
def disconnect():
print('disconnected from server, reconnecting...')
@sio.on("mqtt")
def on_mqtt(data):
epoch_time = int(time.time())
# Unpack the payload into variables, which is a bit easier to work with
datapayload = (data["payload"])
jsondata = json.loads(datapayload)
payload_dst_id = jsondata["DestinationID"]
event_type = jsondata["Event"]
session_stop_time = jsondata["Stop"]
max_old_time = epoch_time - 20
# print(f"MQTT event: {event_type} - {jsondata['SourceCall']} -> TG {payload_dst_id}")
# Build the message that will be sent to Telegram
telegram_message = "Callsign: <b><a href='https://brandmeister.network/index.php?page=profile&call=" +jsondata["SourceCall"] + "'>" + jsondata["SourceCall"] + "</a></b> ID: <b>" + str(jsondata["SourceID"]) + " </b> \nName: " +jsondata["SourceName"] + " \nTalkeralias: <b>" + jsondata["TalkerAlias"] + "</b> \n-----------------------------------\nAccess via: <b>" + jsondata["LinkName"] + " </b>\nVia repeater: <b><a href='https://brandmeister.network/?page=device&id=" + str(jsondata["ContextID"]) + "'>" + jsondata["LinkCall"] + "</a></b>\nSlot: " + str(jsondata["Slot"]) + " TG: " + str(jsondata["DestinationID"]) + "\nMaster: " + str(jsondata["Master"]) + "\nRSSI: <b>" + str(jsondata["RSSI"]) + " dBm</b> BER: <b>" + str(round(jsondata["BER"],2)) + "%</b>"
context_id = _as_int(jsondata["ContextID"])
event_slot = _as_int(jsondata["Slot"])
# Find the cluster member matching this event's repeater, if any.
matched_repeater = next(
(rep for rep in list_of_repeater if _as_int(rep["repeaterid"]) == context_id),
None,
)
repeater_in_cluster = matched_repeater is not None
# A slot match requires the repeater to be in the cluster, its configured
# slot to match the event slot, and the combination not to be excluded.
repeater_match_slot = (
repeater_in_cluster
and _as_int(matched_repeater["slot"]) == event_slot
and (context_id, event_slot) not in slot_exclusions
)
# TG8 cluster check
tg8_match = payload_dst_id == 8
is_session_stop = event_type == "Session-Stop"
is_recent = session_stop_time > max_old_time
# Skip TG8 messages from repeaters in the exclude list
skip_tg8_for_repeater = tg8_match and context_id in tg8_exclude_repeaters
# if tg8_match:
# print(f"[TG8] event=Session-Stop:{is_session_stop} recent:{is_recent} slot_match:{repeater_match_slot} in_cluster:{repeater_in_cluster} cluster_watch:{cluster_watch}")
if tg8_match and is_session_stop and is_recent and repeater_match_slot and repeater_in_cluster and cluster_watch and not skip_tg8_for_repeater:
print (jsondata)
print ("matched via TG8")
send_telegram_message(telegram_message)
# Mirror TG check
tg_match = payload_dst_id == tg_to_watch
# if tg_match:
# print(f"[Mirror-TG] event=Session-Stop:{is_session_stop} recent:{is_recent}")
if tg_match and is_session_stop and is_recent:
print (jsondata)
print ("matched via Mirror-TG")
send_telegram_message(telegram_message)
return
bm_url = 'https://api.brandmeister.network'
bm_path = "/lh/socket.io"
while True:
try:
sio.connect(url=bm_url, socketio_path=bm_path, transports="websocket")
sio.wait()
except Exception as e:
print(f"Connection error: {e}, retrying in 10s...")
time.sleep(10)