"""WebSocket client to OrderPy Cloud. Sends pubKey, receives bridgeId; handles verify_claim, print_test, config, printer_status, print_order.""" import asyncio import base64 import json import logging import os from typing import Any from bridge_core.state import ( check_token, get_bridge_id, invalidate_claim_token, record_verify_failure, set_bridge_id, set_tenant_name, set_ws_connected, unpair, ) logger = logging.getLogger(__name__) # Printers for healthcheck: [{id, address, port}], updated on config _printers: list[dict[str, Any]] = [] _health_task: asyncio.Task[None] | None = None # API key from config; used to fetch print payload when backend sends minimal print_order _api_key: str | None = None _cloud_http_url: str = "" async def _print_service_call_to_all_printers(payload: dict) -> None: """Send receipt bytes to all configured printers (no ack to cloud). Payload must have receipt_bytes_base64.""" b64 = payload.get("receipt_bytes_base64") if not b64: logger.warning("print_service_call missing receipt_bytes_base64") return try: data = base64.b64decode(b64) except Exception as e: logger.warning("print_service_call invalid receipt_bytes_base64: %s", e) return for p in _printers: address = p.get("address", "") port = int(p.get("port", 9100)) if address and 1 <= port <= 65535: ok = await _send_bytes_to_printer(address, port, data) if ok: logger.info("Service call sent to printer %s:%s", address, port) async def _send_bytes_to_printer(address: str, port: int, data: bytes) -> bool: """Send raw ESC/POS bytes to printer. Returns True on success.""" try: reader, writer = await asyncio.wait_for( asyncio.open_connection(address, port), timeout=5.0, ) try: writer.write(data) await asyncio.wait_for(writer.drain(), timeout=15.0) finally: writer.close() try: await asyncio.wait_for(writer.wait_closed(), timeout=5.0) except asyncio.TimeoutError: pass return True except (OSError, asyncio.TimeoutError) as e: logger.warning("Send to %s:%s failed: %s", address, port, e) return False except Exception as e: logger.warning("Send to %s:%s unexpected error: %s", address, port, e) return False async def _fetch_order_print_payload(order_id: str, printer_id: str | None) -> str | None: """GET receipt payload from backend. Returns receipt_bytes_base64 or None.""" global _api_key, _cloud_http_url if not _api_key or not _cloud_http_url: logger.warning("Cannot fetch print payload: api_key=%s cloud_http_url=%s", bool(_api_key), bool(_cloud_http_url)) return None try: import httpx path = f"/api/v1/bridges/print-jobs/orders/{order_id}" if printer_id: path += f"?printer_id={printer_id}" async with httpx.AsyncClient(timeout=15.0) as client: r = await client.get( _cloud_http_url + path, headers={"X-Bridge-Token": _api_key}, ) if r.status_code != 200: logger.warning("Fetch print payload %s: %s", path, r.status_code) return None data = r.json() return data.get("receipt_bytes_base64") except Exception as e: logger.warning("Fetch print payload failed: %s", e) return None async def _print_order_to_all_printers(payload: dict, ws: Any = None) -> None: """Send receipt bytes to target printer(s). Payload may have receipt_bytes_base64 or we fetch via API.""" order_id = payload.get("order_id") target_printer_id = payload.get("printer_id") b64 = payload.get("receipt_bytes_base64") if not b64 and order_id: b64 = await _fetch_order_print_payload(str(order_id), str(target_printer_id) if target_printer_id else None) if not b64: logger.warning("print_order missing receipt_bytes_base64 and could not fetch") if order_id and ws: try: await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)})) except Exception: pass return try: data = base64.b64decode(b64) except Exception as e: logger.warning("print_order invalid receipt_bytes_base64: %s", e) if order_id and ws: try: await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)})) except Exception: pass return printers = list(_printers) if target_printer_id: printers = [p for p in printers if str(p.get("id")) == str(target_printer_id)] if not printers: if target_printer_id: logger.warning("Printer %s not found for order %s", target_printer_id, order_id or "?") else: logger.warning("No printers configured; order %s not printed", order_id or "?") at_least_one_ok = False for p in printers: address = p.get("address", "") port = int(p.get("port", 9100)) if address and 1 <= port <= 65535: ok = await _send_bytes_to_printer(address, port, data) if ok: at_least_one_ok = True logger.info("Order %s sent to printer %s:%s", order_id or "?", address, port) if order_id and ws: try: if at_least_one_ok: await ws.send(json.dumps({"action": "order_printed", "order_id": str(order_id)})) else: await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)})) except Exception as e: logger.warning("Could not send print result to cloud: %s", e) async def _check_printer_reachable(address: str, port: int) -> bool: """TCP connect to printer at address:port. Returns True if reachable.""" try: reader, writer = await asyncio.wait_for( asyncio.open_connection(address, port), timeout=3.0, ) writer.close() await writer.wait_closed() return True except (OSError, asyncio.TimeoutError): return False async def _run_printer_healthcheck(ws: Any) -> None: """Periodically check printer reachability and send status to cloud.""" from bridge_core.app_config import get_printer_health_interval interval = get_printer_health_interval() global _printers, _health_task while True: await asyncio.sleep(interval) printers = _printers if not printers: continue statuses = [] for p in printers: printer_id = p.get("id") address = p.get("address", "") port = int(p.get("port", 9100)) if not printer_id or not address: continue reachable = False if 1 <= port <= 65535: reachable = await _check_printer_reachable(address, port) statuses.append({"printer_id": printer_id, "reachable": reachable}) if statuses: try: await ws.send(json.dumps({"action": "printer_status", "statuses": statuses})) except Exception: break def _get_public_key() -> str: """Persistent keypair: load or generate and save. Returns PEM public key.""" key_path = os.environ.get("ORDERPY_KEY_PATH", "./data/bridge_key.pem") data_dir = os.path.dirname(key_path) if data_dir and not os.path.isdir(data_dir): os.makedirs(data_dir, exist_ok=True) priv_path = key_path if key_path.endswith(".pem") else key_path + ".priv" pub_path = key_path.replace(".pem", ".pub") if ".pem" in key_path else key_path + ".pub" if os.path.isfile(pub_path): with open(pub_path, "r") as f: return f.read() from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization key = rsa.generate_private_key(public_exponent=65537, key_size=2048) priv_pem = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) pub_pem = key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) with open(priv_path, "wb") as f: f.write(priv_pem) with open(pub_path, "wb") as f: f.write(pub_pem) return pub_pem.decode() async def run_cloud_client(cloud_ws_url: str = "", pairing_queue: asyncio.Queue | None = None) -> None: global _health_task, _api_key, _cloud_http_url from bridge_core.app_config import get_cloud_url url = cloud_ws_url or get_cloud_url() ws_url = f"{url}/api/v1/bridges/connect" try: pub_key = _get_public_key() except Exception as e: logger.error("Failed to load/generate keypair: %s — bridge cannot connect to cloud", e) return while True: try: import websockets async with websockets.connect(ws_url, close_timeout=5) as ws: set_ws_connected(True) try: await ws.send(json.dumps({"pubKey": pub_key, "version": "1.0"})) msg = await ws.recv() data = json.loads(msg) bridge_id = data.get("bridgeId") or data.get("bridge_id") status = data.get("status", "UNCLAIMED") if status == "UNCLAIMED": unpair() elif status == "ACTIVE": invalidate_claim_token() if bridge_id: set_bridge_id(bridge_id) logger.info("Registered with Cloud: bridge_id=%s, status=%s", bridge_id, status) _printers.clear() _health_task = None pending_pairing_future: asyncio.Future | None = None while True: recv_task = asyncio.create_task(ws.recv()) tasks = [recv_task] get_task = None if pairing_queue is not None: get_task = asyncio.create_task(pairing_queue.get()) tasks.append(get_task) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for t in pending: t.cancel() try: await t except asyncio.CancelledError: pass raw = None pairing_item = None recv_failed = False if recv_task in done: try: raw = recv_task.result() except Exception: recv_failed = True if get_task is not None and get_task in done: try: pairing_item = get_task.result() except Exception: pass if pairing_item is not None: code, future = pairing_item if recv_failed: if not future.done(): future.set_result({"ok": False, "reason": "connection_lost"}) else: try: await ws.send(json.dumps({"action": "submit_pairing_code", "code": code})) pending_pairing_future = future except Exception as e: logger.warning("Failed to send submit_pairing_code: %s", e) if not future.done(): future.set_result({"ok": False, "reason": "send_failed"}) if recv_failed: logger.info("WebSocket recv failed, reconnecting…") break if raw is None: continue msg_data = json.loads(raw) action = msg_data.get("action") if action == "pairing_result": if pending_pairing_future is not None and not pending_pairing_future.done(): pending_pairing_future.set_result({ "ok": bool(msg_data.get("ok", False)), "reason": msg_data.get("reason"), }) pending_pairing_future = None continue if action == "verify_claim": token = msg_data.get("claimToken", "") ok = check_token(token) if ok: invalidate_claim_token() else: record_verify_failure() await ws.send(json.dumps({"ok": ok})) elif action == "config": _api_key = msg_data.get("api_key") or None if url.startswith("wss://"): _cloud_http_url = "https://" + url[6:] elif url.startswith("ws://"): _cloud_http_url = "http://" + url[5:] else: _cloud_http_url = url _cloud_http_url = _cloud_http_url.rstrip("/") set_tenant_name(msg_data.get("tenant_name") or "") invalidate_claim_token() printers_list = msg_data.get("printers", []) if isinstance(printers_list, list): _printers.clear() _printers.extend( {"id": p.get("id"), "address": p.get("address", ""), "port": int(p.get("port", 9100))} for p in printers_list if p.get("id") and p.get("address") ) if _health_task is None and _printers: _health_task = asyncio.create_task(_run_printer_healthcheck(ws)) elif action == "print_order": payload = { "order_id": msg_data.get("order_id"), "printer_id": msg_data.get("printer_id"), "receipt_bytes_base64": msg_data.get("receipt_bytes_base64"), } asyncio.create_task(_print_order_to_all_printers(payload, ws)) elif action == "print_service_call": payload = { "receipt_bytes_base64": msg_data.get("receipt_bytes_base64"), } asyncio.create_task(_print_service_call_to_all_printers(payload)) elif action == "config_update": printers_list = msg_data.get("printers", []) if isinstance(printers_list, list): updated = [ {"id": p.get("id"), "address": p.get("address", ""), "port": int(p.get("port", 9100))} for p in printers_list if p.get("id") and p.get("address") ] if not updated: continue by_id = {p["id"]: p for p in _printers} for p in updated: by_id[p["id"]] = p _printers[:] = list(by_id.values()) if _printers and updated: statuses = [] for p in updated: printer_id = p.get("id") address = p.get("address", "") port = int(p.get("port", 9100)) reachable = False if 1 <= port <= 65535 and address: reachable = await _check_printer_reachable(address, port) statuses.append({"printer_id": printer_id, "reachable": reachable}) if statuses: try: await ws.send(json.dumps({"action": "printer_status", "statuses": statuses})) except Exception: pass elif action == "unpaired": _printers.clear() if _health_task and not _health_task.done(): _health_task.cancel() try: await _health_task except asyncio.CancelledError: pass _health_task = None unpair() # Keep bridge_id so /setup/info stays 200 until we reconnect and get a new id logger.info("Unpaired by Cloud; new claim token issued") break # leave inner loop so we don't send (e.g. submit_pairing_code) on closed connection elif action == "print_test": printer_id = msg_data.get("printer_id", "") address = msg_data.get("address", "") port = int(msg_data.get("port", 9100)) b64 = msg_data.get("receipt_bytes_base64") ok = False if address and 1 <= port <= 65535 and b64: try: data = base64.b64decode(b64) ok = await _send_bytes_to_printer(address, port, data) except Exception as e: logger.warning("print_test decode/send failed: %s", e) await ws.send(json.dumps({ "action": "print_test_result", "printer_id": printer_id, "ok": ok, })) finally: set_ws_connected(False) except Exception as e: logger.warning("Cloud WebSocket error: %s", e) _printers.clear() if _health_task and not _health_task.done(): _health_task.cancel() try: await _health_task except asyncio.CancelledError: pass _health_task = None await asyncio.sleep(5)