"""In-memory state: OTS, bridge_id from Cloud, Fail2Ban.""" import secrets import time from threading import Lock # Current one-time secret (32 bytes as hex = 64 chars). Only in RAM. _claim_token: str = "" _claimed: bool = False # True after successful claim; do not issue new token _lock = Lock() # Bridge ID we got from Cloud after WebSocket register _bridge_id: str = "" # Tenant name from Cloud after claim (config message), for pairing UI display _tenant_name: str = "" # WebSocket connected to Cloud (for pairing UI online/offline badge) _ws_connected: bool = False # Fail2Ban: after 5 failed verify_claim responses, lock until lock_until _fail_count = 0 _lock_until: float = 0 FAIL_THRESHOLD = 5 LOCK_SECONDS = 300 # 5 minutes def _generate_ots() -> str: return secrets.token_hex(32) def get_claim_token() -> str: with _lock: global _claim_token, _claimed if _claimed: return "" if not _claim_token: _claim_token = _generate_ots() return _claim_token def rotate_claim_token() -> str: with _lock: global _claim_token, _claimed if _claimed: return "" _claim_token = _generate_ots() return _claim_token def invalidate_claim_token() -> None: """After successful claim, clear OTS and mark as claimed.""" with _lock: global _claim_token, _claimed _claim_token = "" _claimed = True def unpair() -> str: """Called when Cloud sends unpaired. Reset claimed state and return new OTS.""" with _lock: global _claim_token, _claimed, _tenant_name _claimed = False _tenant_name = "" _claim_token = _generate_ots() # _ws_connected is set by cloud_client when connection closes return _claim_token def set_bridge_id(bridge_id: str) -> None: global _bridge_id with _lock: _bridge_id = bridge_id def get_bridge_id() -> str: with _lock: return _bridge_id def is_claimed() -> bool: with _lock: return _claimed def set_tenant_name(name: str) -> None: global _tenant_name with _lock: _tenant_name = name or "" def get_tenant_name() -> str: with _lock: return _tenant_name def set_ws_connected(connected: bool) -> None: global _ws_connected with _lock: _ws_connected = connected def get_ws_connected() -> bool: with _lock: return _ws_connected def check_token(claimed: str) -> bool: with _lock: return bool(_claim_token and secrets.compare_digest(_claim_token, claimed)) def record_verify_failure() -> None: """Call when we responded ok: false to verify_claim. Fail2Ban logic.""" global _fail_count, _lock_until with _lock: _fail_count += 1 if _fail_count >= FAIL_THRESHOLD: _lock_until = time.monotonic() + LOCK_SECONDS def is_locked() -> bool: with _lock: global _fail_count if _lock_until > 0 and time.monotonic() < _lock_until: return True if time.monotonic() >= _lock_until: _fail_count = 0 return False return False def reset_lock_after_timeout() -> None: """Reset fail count after lock period expires.""" with _lock: global _fail_count if _lock_until > 0 and time.monotonic() >= _lock_until: _fail_count = 0