424 lines
20 KiB
Python
424 lines
20 KiB
Python
"""WebSocket client to OrderPy Cloud. Sends pubKey, receives bridgeId; handles verify_claim, print_test, config, printer_status, print_order."""
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from bridge_core.state import (
|
|
check_token,
|
|
get_bridge_id,
|
|
invalidate_claim_token,
|
|
record_verify_failure,
|
|
set_bridge_id,
|
|
set_tenant_name,
|
|
set_ws_connected,
|
|
unpair,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Printers for healthcheck: [{id, address, port}], updated on config
|
|
_printers: list[dict[str, Any]] = []
|
|
_health_task: asyncio.Task[None] | None = None
|
|
# API key from config; used to fetch print payload when backend sends minimal print_order
|
|
_api_key: str | None = None
|
|
_cloud_http_url: str = ""
|
|
|
|
|
|
async def _print_service_call_to_all_printers(payload: dict) -> None:
|
|
"""Send receipt bytes to all configured printers (no ack to cloud). Payload must have receipt_bytes_base64."""
|
|
b64 = payload.get("receipt_bytes_base64")
|
|
if not b64:
|
|
logger.warning("print_service_call missing receipt_bytes_base64")
|
|
return
|
|
try:
|
|
data = base64.b64decode(b64)
|
|
except Exception as e:
|
|
logger.warning("print_service_call invalid receipt_bytes_base64: %s", e)
|
|
return
|
|
for p in _printers:
|
|
address = p.get("address", "")
|
|
port = int(p.get("port", 9100))
|
|
if address and 1 <= port <= 65535:
|
|
ok = await _send_bytes_to_printer(address, port, data)
|
|
if ok:
|
|
logger.info("Service call sent to printer %s:%s", address, port)
|
|
|
|
|
|
async def _send_bytes_to_printer(address: str, port: int, data: bytes) -> bool:
|
|
"""Send raw ESC/POS bytes to printer. Returns True on success."""
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(address, port),
|
|
timeout=5.0,
|
|
)
|
|
try:
|
|
writer.write(data)
|
|
await asyncio.wait_for(writer.drain(), timeout=15.0)
|
|
finally:
|
|
writer.close()
|
|
try:
|
|
await asyncio.wait_for(writer.wait_closed(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
return True
|
|
except (OSError, asyncio.TimeoutError) as e:
|
|
logger.warning("Send to %s:%s failed: %s", address, port, e)
|
|
return False
|
|
except Exception as e:
|
|
logger.warning("Send to %s:%s unexpected error: %s", address, port, e)
|
|
return False
|
|
|
|
|
|
async def _fetch_order_print_payload(order_id: str, printer_id: str | None) -> str | None:
|
|
"""GET receipt payload from backend. Returns receipt_bytes_base64 or None."""
|
|
global _api_key, _cloud_http_url
|
|
if not _api_key or not _cloud_http_url:
|
|
logger.warning("Cannot fetch print payload: api_key=%s cloud_http_url=%s", bool(_api_key), bool(_cloud_http_url))
|
|
return None
|
|
try:
|
|
import httpx
|
|
path = f"/api/v1/bridges/print-jobs/orders/{order_id}"
|
|
if printer_id:
|
|
path += f"?printer_id={printer_id}"
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
r = await client.get(
|
|
_cloud_http_url + path,
|
|
headers={"X-Bridge-Token": _api_key},
|
|
)
|
|
if r.status_code != 200:
|
|
logger.warning("Fetch print payload %s: %s", path, r.status_code)
|
|
return None
|
|
data = r.json()
|
|
return data.get("receipt_bytes_base64")
|
|
except Exception as e:
|
|
logger.warning("Fetch print payload failed: %s", e)
|
|
return None
|
|
|
|
|
|
async def _print_order_to_all_printers(payload: dict, ws: Any = None) -> None:
|
|
"""Send receipt bytes to target printer(s). Payload may have receipt_bytes_base64 or we fetch via API."""
|
|
order_id = payload.get("order_id")
|
|
target_printer_id = payload.get("printer_id")
|
|
b64 = payload.get("receipt_bytes_base64")
|
|
if not b64 and order_id:
|
|
b64 = await _fetch_order_print_payload(str(order_id), str(target_printer_id) if target_printer_id else None)
|
|
if not b64:
|
|
logger.warning("print_order missing receipt_bytes_base64 and could not fetch")
|
|
if order_id and ws:
|
|
try:
|
|
await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)}))
|
|
except Exception:
|
|
pass
|
|
return
|
|
try:
|
|
data = base64.b64decode(b64)
|
|
except Exception as e:
|
|
logger.warning("print_order invalid receipt_bytes_base64: %s", e)
|
|
if order_id and ws:
|
|
try:
|
|
await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)}))
|
|
except Exception:
|
|
pass
|
|
return
|
|
printers = list(_printers)
|
|
if target_printer_id:
|
|
printers = [p for p in printers if str(p.get("id")) == str(target_printer_id)]
|
|
if not printers:
|
|
if target_printer_id:
|
|
logger.warning("Printer %s not found for order %s", target_printer_id, order_id or "?")
|
|
else:
|
|
logger.warning("No printers configured; order %s not printed", order_id or "?")
|
|
at_least_one_ok = False
|
|
for p in printers:
|
|
address = p.get("address", "")
|
|
port = int(p.get("port", 9100))
|
|
if address and 1 <= port <= 65535:
|
|
ok = await _send_bytes_to_printer(address, port, data)
|
|
if ok:
|
|
at_least_one_ok = True
|
|
logger.info("Order %s sent to printer %s:%s", order_id or "?", address, port)
|
|
if order_id and ws:
|
|
try:
|
|
if at_least_one_ok:
|
|
await ws.send(json.dumps({"action": "order_printed", "order_id": str(order_id)}))
|
|
else:
|
|
await ws.send(json.dumps({"action": "order_print_failed", "order_id": str(order_id)}))
|
|
except Exception as e:
|
|
logger.warning("Could not send print result to cloud: %s", e)
|
|
|
|
|
|
async def _check_printer_reachable(address: str, port: int) -> bool:
|
|
"""TCP connect to printer at address:port. Returns True if reachable."""
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(address, port),
|
|
timeout=3.0,
|
|
)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True
|
|
except (OSError, asyncio.TimeoutError):
|
|
return False
|
|
|
|
|
|
async def _run_printer_healthcheck(ws: Any) -> None:
|
|
"""Periodically check printer reachability and send status to cloud."""
|
|
from bridge_core.app_config import get_printer_health_interval
|
|
interval = get_printer_health_interval()
|
|
global _printers, _health_task
|
|
while True:
|
|
await asyncio.sleep(interval)
|
|
printers = _printers
|
|
if not printers:
|
|
continue
|
|
statuses = []
|
|
for p in printers:
|
|
printer_id = p.get("id")
|
|
address = p.get("address", "")
|
|
port = int(p.get("port", 9100))
|
|
if not printer_id or not address:
|
|
continue
|
|
reachable = False
|
|
if 1 <= port <= 65535:
|
|
reachable = await _check_printer_reachable(address, port)
|
|
statuses.append({"printer_id": printer_id, "reachable": reachable})
|
|
if statuses:
|
|
try:
|
|
await ws.send(json.dumps({"action": "printer_status", "statuses": statuses}))
|
|
except Exception:
|
|
break
|
|
|
|
|
|
def _get_public_key() -> str:
|
|
"""Persistent keypair: load or generate and save. Returns PEM public key."""
|
|
key_path = os.environ.get("ORDERPY_KEY_PATH", "./data/bridge_key.pem")
|
|
data_dir = os.path.dirname(key_path)
|
|
if data_dir and not os.path.isdir(data_dir):
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
priv_path = key_path if key_path.endswith(".pem") else key_path + ".priv"
|
|
pub_path = key_path.replace(".pem", ".pub") if ".pem" in key_path else key_path + ".pub"
|
|
if os.path.isfile(pub_path):
|
|
with open(pub_path, "r") as f:
|
|
return f.read()
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives import serialization
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
priv_pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
pub_pem = key.public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
)
|
|
with open(priv_path, "wb") as f:
|
|
f.write(priv_pem)
|
|
with open(pub_path, "wb") as f:
|
|
f.write(pub_pem)
|
|
return pub_pem.decode()
|
|
|
|
|
|
async def run_cloud_client(cloud_ws_url: str = "", pairing_queue: asyncio.Queue | None = None) -> None:
|
|
global _health_task, _api_key, _cloud_http_url
|
|
from bridge_core.app_config import get_cloud_url
|
|
url = cloud_ws_url or get_cloud_url()
|
|
ws_url = f"{url}/api/v1/bridges/connect"
|
|
try:
|
|
pub_key = _get_public_key()
|
|
except Exception as e:
|
|
logger.error("Failed to load/generate keypair: %s — bridge cannot connect to cloud", e)
|
|
return
|
|
while True:
|
|
try:
|
|
import websockets
|
|
async with websockets.connect(ws_url, close_timeout=5) as ws:
|
|
set_ws_connected(True)
|
|
try:
|
|
await ws.send(json.dumps({"pubKey": pub_key, "version": "1.0"}))
|
|
msg = await ws.recv()
|
|
data = json.loads(msg)
|
|
bridge_id = data.get("bridgeId") or data.get("bridge_id")
|
|
status = data.get("status", "UNCLAIMED")
|
|
if status == "UNCLAIMED":
|
|
unpair()
|
|
elif status == "ACTIVE":
|
|
invalidate_claim_token()
|
|
if bridge_id:
|
|
set_bridge_id(bridge_id)
|
|
logger.info("Registered with Cloud: bridge_id=%s, status=%s", bridge_id, status)
|
|
_printers.clear()
|
|
_health_task = None
|
|
pending_pairing_future: asyncio.Future | None = None
|
|
while True:
|
|
recv_task = asyncio.create_task(ws.recv())
|
|
tasks = [recv_task]
|
|
get_task = None
|
|
if pairing_queue is not None:
|
|
get_task = asyncio.create_task(pairing_queue.get())
|
|
tasks.append(get_task)
|
|
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
for t in pending:
|
|
t.cancel()
|
|
try:
|
|
await t
|
|
except asyncio.CancelledError:
|
|
pass
|
|
raw = None
|
|
pairing_item = None
|
|
recv_failed = False
|
|
if recv_task in done:
|
|
try:
|
|
raw = recv_task.result()
|
|
except Exception:
|
|
recv_failed = True
|
|
if get_task is not None and get_task in done:
|
|
try:
|
|
pairing_item = get_task.result()
|
|
except Exception:
|
|
pass
|
|
if pairing_item is not None:
|
|
code, future = pairing_item
|
|
if recv_failed:
|
|
if not future.done():
|
|
future.set_result({"ok": False, "reason": "connection_lost"})
|
|
else:
|
|
try:
|
|
await ws.send(json.dumps({"action": "submit_pairing_code", "code": code}))
|
|
pending_pairing_future = future
|
|
except Exception as e:
|
|
logger.warning("Failed to send submit_pairing_code: %s", e)
|
|
if not future.done():
|
|
future.set_result({"ok": False, "reason": "send_failed"})
|
|
if recv_failed:
|
|
logger.info("WebSocket recv failed, reconnecting…")
|
|
break
|
|
if raw is None:
|
|
continue
|
|
msg_data = json.loads(raw)
|
|
action = msg_data.get("action")
|
|
if action == "pairing_result":
|
|
if pending_pairing_future is not None and not pending_pairing_future.done():
|
|
pending_pairing_future.set_result({
|
|
"ok": bool(msg_data.get("ok", False)),
|
|
"reason": msg_data.get("reason"),
|
|
})
|
|
pending_pairing_future = None
|
|
continue
|
|
if action == "verify_claim":
|
|
token = msg_data.get("claimToken", "")
|
|
ok = check_token(token)
|
|
if ok:
|
|
invalidate_claim_token()
|
|
else:
|
|
record_verify_failure()
|
|
await ws.send(json.dumps({"ok": ok}))
|
|
elif action == "config":
|
|
_api_key = msg_data.get("api_key") or None
|
|
if url.startswith("wss://"):
|
|
_cloud_http_url = "https://" + url[6:]
|
|
elif url.startswith("ws://"):
|
|
_cloud_http_url = "http://" + url[5:]
|
|
else:
|
|
_cloud_http_url = url
|
|
_cloud_http_url = _cloud_http_url.rstrip("/")
|
|
set_tenant_name(msg_data.get("tenant_name") or "")
|
|
invalidate_claim_token()
|
|
printers_list = msg_data.get("printers", [])
|
|
if isinstance(printers_list, list):
|
|
_printers.clear()
|
|
_printers.extend(
|
|
{"id": p.get("id"), "address": p.get("address", ""), "port": int(p.get("port", 9100))}
|
|
for p in printers_list
|
|
if p.get("id") and p.get("address")
|
|
)
|
|
if _health_task is None and _printers:
|
|
_health_task = asyncio.create_task(_run_printer_healthcheck(ws))
|
|
elif action == "print_order":
|
|
payload = {
|
|
"order_id": msg_data.get("order_id"),
|
|
"printer_id": msg_data.get("printer_id"),
|
|
"receipt_bytes_base64": msg_data.get("receipt_bytes_base64"),
|
|
}
|
|
asyncio.create_task(_print_order_to_all_printers(payload, ws))
|
|
elif action == "print_service_call":
|
|
payload = {
|
|
"receipt_bytes_base64": msg_data.get("receipt_bytes_base64"),
|
|
}
|
|
asyncio.create_task(_print_service_call_to_all_printers(payload))
|
|
elif action == "config_update":
|
|
printers_list = msg_data.get("printers", [])
|
|
if isinstance(printers_list, list):
|
|
updated = [
|
|
{"id": p.get("id"), "address": p.get("address", ""), "port": int(p.get("port", 9100))}
|
|
for p in printers_list
|
|
if p.get("id") and p.get("address")
|
|
]
|
|
if not updated:
|
|
continue
|
|
by_id = {p["id"]: p for p in _printers}
|
|
for p in updated:
|
|
by_id[p["id"]] = p
|
|
_printers[:] = list(by_id.values())
|
|
if _printers and updated:
|
|
statuses = []
|
|
for p in updated:
|
|
printer_id = p.get("id")
|
|
address = p.get("address", "")
|
|
port = int(p.get("port", 9100))
|
|
reachable = False
|
|
if 1 <= port <= 65535 and address:
|
|
reachable = await _check_printer_reachable(address, port)
|
|
statuses.append({"printer_id": printer_id, "reachable": reachable})
|
|
if statuses:
|
|
try:
|
|
await ws.send(json.dumps({"action": "printer_status", "statuses": statuses}))
|
|
except Exception:
|
|
pass
|
|
elif action == "unpaired":
|
|
_printers.clear()
|
|
if _health_task and not _health_task.done():
|
|
_health_task.cancel()
|
|
try:
|
|
await _health_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
_health_task = None
|
|
unpair()
|
|
# Keep bridge_id so /setup/info stays 200 until we reconnect and get a new id
|
|
logger.info("Unpaired by Cloud; new claim token issued")
|
|
break # leave inner loop so we don't send (e.g. submit_pairing_code) on closed connection
|
|
elif action == "print_test":
|
|
printer_id = msg_data.get("printer_id", "")
|
|
address = msg_data.get("address", "")
|
|
port = int(msg_data.get("port", 9100))
|
|
b64 = msg_data.get("receipt_bytes_base64")
|
|
ok = False
|
|
if address and 1 <= port <= 65535 and b64:
|
|
try:
|
|
data = base64.b64decode(b64)
|
|
ok = await _send_bytes_to_printer(address, port, data)
|
|
except Exception as e:
|
|
logger.warning("print_test decode/send failed: %s", e)
|
|
await ws.send(json.dumps({
|
|
"action": "print_test_result",
|
|
"printer_id": printer_id,
|
|
"ok": ok,
|
|
}))
|
|
finally:
|
|
set_ws_connected(False)
|
|
except Exception as e:
|
|
logger.warning("Cloud WebSocket error: %s", e)
|
|
_printers.clear()
|
|
if _health_task and not _health_task.done():
|
|
_health_task.cancel()
|
|
try:
|
|
await _health_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
_health_task = None
|
|
await asyncio.sleep(5)
|