Files
KX-Bridge-Release/kobrax_moonraker_bridge.py
2026-05-19 12:07:33 +02:00

2808 lines
119 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
kobrax_moonraker_bridge.py Moonraker-kompatibler HTTP/WebSocket-Bridge für Anycubic Kobra X
Emuliert die Moonraker/Klipper-API damit OrcaSlicer den Kobra X direkt ansteuern kann.
Verwendung:
python kobrax_moonraker_bridge.py --printer-ip 192.168.178.94
OrcaSlicer-Konfiguration:
Drucker-Typ: Klipper | Host: 127.0.0.1 | Port: 7125
"""
import argparse
import html
import sqlite3
import uuid
try:
import config_loader as env_loader
except ImportError:
import env_loader
import asyncio
import hashlib
import json
import logging
import os
import pathlib
import re
import subprocess
import sys
import tempfile
import time
import threading
# Bei PyInstaller-Binary liegt alles neben sys.executable, sonst neben __file__
_BASE = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _BASE)
from kobrax_client import KobraXClient
try:
import imageio_ffmpeg
def _find_ffmpeg() -> str:
return imageio_ffmpeg.get_ffmpeg_exe()
except ImportError:
def _find_ffmpeg() -> str:
exe_name = "ffmpeg.exe" if sys.platform == "win32" else "ffmpeg"
local = os.path.join(_BASE, exe_name)
if os.path.isfile(local):
return local
return "ffmpeg"
try:
from aiohttp import web
import aiohttp
except ImportError:
print("Fehler: aiohttp nicht installiert. Bitte: pip install aiohttp")
sys.exit(1)
try:
import base64 as _base64
from Crypto.Cipher import AES as _AES
from Crypto.Util.Padding import unpad as _unpad
_HAS_CRYPTO = True
except ImportError:
_HAS_CRYPTO = False
def _kx_generate_signature(token: str, ts: int, nonce: str) -> str:
first = hashlib.md5(token[:16].encode()).hexdigest()
return hashlib.md5((first + str(ts) + nonce).encode()).hexdigest()
def _kx_decrypt_info(encrypted_b64: str, key: str, iv: str) -> dict:
cipher = _AES.new(key.encode(), _AES.MODE_CBC, iv.encode())
raw = _base64.b64decode(encrypted_b64)
return json.loads(_unpad(cipher.decrypt(raw), _AES.block_size).decode())
async def _kx_fetch_credentials(ip: str, port: int = 18910) -> dict:
"""Holt + entschlüsselt Drucker-Credentials via HTTP /info + /ctrl.
Wirft eine Exception bei Netzwerk-/Decrypt-Fehlern. Algorithmus aus
tools/fetch_credentials.py (AES-256-CBC, Key=token[16:32], IV=ctrl-token).
"""
if not _HAS_CRYPTO:
raise RuntimeError("pycryptodome nicht installiert")
import random, string
nonce = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(6))
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession() as s:
async with s.get(f"http://{ip}:{port}/info", timeout=timeout) as r:
r.raise_for_status()
info = await r.json()
token = info["token"]
ts = int(time.time() * 1000)
sign = _kx_generate_signature(token, ts, nonce)
params = {"ts": ts, "nonce": nonce, "sign": sign, "did": "random"}
async with s.post(f"http://{ip}:{port}/ctrl", params=params, timeout=timeout) as r:
r.raise_for_status()
data = await r.json()
result = _kx_decrypt_info(data["data"]["info"], token[16:32], data["data"]["token"])
if "error" in result:
raise RuntimeError(result.get("error", "decrypt failed"))
return {
"printer_ip": result.get("ip", ip),
"username": result.get("username", ""),
"password": result.get("password", ""),
"device_id": result.get("deviceId", ""),
"mode_id": str(result.get("modeId", "20030")),
"model": result.get("modelName", "Anycubic Kobra"),
}
logging.basicConfig(level=logging.INFO,
format="[%(asctime)s] %(levelname)-5s %(name)s: %(message)s",
datefmt="%H:%M:%S")
log = logging.getLogger("bridge")
# Web-UI: Unterverzeichnis unter web/themes/<name>/index.html
_UI_THEME_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{0,63}$")
# Erlaubte statische Theme-Dateien unter /kx/ui/<name>
_KX_UI_ASSETS: dict[str, str] = {
"style.css": "text/css",
"app.js": "application/javascript",
}
# Ring-Buffer für Browser-Log-Stream (letzte 200 Einträge)
import collections as _collections
_log_buffer: "_collections.deque[dict]" = _collections.deque(maxlen=500)
_log_sse_queues: "list[asyncio.Queue]" = []
class _BrowserLogHandler(logging.Handler):
"""Sendet Log-Records in den Ring-Buffer und alle offenen SSE-Queues."""
_fmt = logging.Formatter(datefmt="%H:%M:%S")
def emit(self, record: logging.LogRecord):
entry = {
"ts": self._fmt.formatTime(record, "%H:%M:%S"),
"lvl": record.levelname,
"name": record.name,
"msg": record.getMessage(),
}
_log_buffer.append(entry)
for q in list(_log_sse_queues):
try:
q.put_nowait(entry)
except Exception:
pass
_browser_handler = _BrowserLogHandler()
logging.getLogger().addHandler(_browser_handler)
KOBRA_TO_KLIPPER_STATE = {
"free": "standby",
"busy": "printing",
"printing": "printing",
"preheating": "printing",
"auto_leveling": "printing",
"checking": "printing",
"updated": "printing",
"init": "printing",
"pausing": "paused",
"paused": "paused",
"resuming": "printing",
"resumed": "printing",
"stopping": "printing",
"stoped": "standby",
"finished": "complete",
"failed": "error",
"canceled": "standby",
}
MOONRAKER_VERSION = "v0.9.3-1"
KLIPPER_VERSION = "v0.12.0-1"
def _parse_gcode_estimated_time(data: bytes) -> int:
"""Liest geschätzte Druckzeit aus GCode (OrcaSlicer + PrusaSlicer).
Gibt Sekunden zurück, 0 wenn nicht gefunden.
PrusaSlicer schreibt die Zeit ins Header (erste 16KB),
OrcaSlicer schreibt sie ans Ende der Datei (letzte 16KB)."""
import re
# Anfang + Ende der Datei durchsuchen (OrcaSlicer schreibt Zeit am Ende)
search_text = (data[:16384] + data[-65536:]).decode("utf-8", errors="ignore")
# OrcaSlicer: ; total estimated time: 9m 20s
# PrusaSlicer: ; estimated printing time (normal mode) = 1h 9m 20s
m = (re.search(r";\s*total estimated time:\s*(.*)", search_text) or
re.search(r";\s*estimated printing time \(normal mode\)\s*=\s*(.*)", search_text))
if not m:
return 0
parts = re.findall(r"(\d+)\s*([hms])", m.group(1))
secs = 0
for val, unit in parts:
if unit == "h": secs += int(val) * 3600
elif unit == "m": secs += int(val) * 60
elif unit == "s": secs += int(val)
if secs:
log.info(f"Slicer-Schätzzeit: {secs}s ({m.group(1).strip()})")
return secs
def _extract_thumbnail(data: bytes) -> str:
"""Extrahiert Base64-PNG-Thumbnail aus GCode (OrcaSlicer-Format)."""
try:
marker = b"; thumbnail begin"
end_marker = b"; thumbnail end"
start = data.find(marker)
if start == -1:
return ""
start = data.find(b"\n", start) + 1
end = data.find(end_marker, start)
if end == -1:
return ""
lines = data[start:end].split(b"\n")
b64 = b"".join(
line[2:].strip() if line.startswith(b"; ") else line.strip()
for line in lines
)
return b64.decode("ascii")
except Exception:
return ""
def _extract_filament_info(data: bytes) -> list[dict]:
"""Liest filament_colour + filament_type aus GCode-Header (OrcaSlicer/PrusaSlicer).
Gibt Liste von {color_hex, material} pro Slot zurück, leer wenn nicht gefunden.
Liest nur die ersten 8KB (Header-Bereich).
"""
try:
header = data[:8192].decode("utf-8", errors="ignore")
colors, materials = [], []
for line in header.splitlines():
if line.startswith("; filament_colour"):
val = line.split("=", 1)[-1].strip()
colors = [c.strip().lstrip("#") for c in val.split(";") if c.strip()]
elif line.startswith("; filament_type"):
val = line.split("=", 1)[-1].strip()
materials = [m.strip() for m in val.split(";") if m.strip()]
if not colors:
return []
result = []
for i, hex_color in enumerate(colors):
result.append({
"slot_index": i,
"color_hex": "#" + hex_color.upper() if hex_color else "#FFFFFF",
"material": materials[i] if i < len(materials) else "PLA",
})
return result
except Exception:
return []
class GCodeStore:
"""Persistenter GCode-Store pro Bridge-Instanz (SQLite)."""
def __init__(self, data_dir: str):
os.makedirs(data_dir, exist_ok=True)
self._gcode_dir = os.path.join(data_dir, "gcodes")
os.makedirs(self._gcode_dir, exist_ok=True)
db_path = os.path.join(data_dir, "kx-bridge.db")
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._lock = threading.Lock()
self._init_schema()
def _init_schema(self):
with self._lock:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS gcode_files (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
path TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
uploaded_at TEXT NOT NULL,
thumbnail_b64 TEXT,
est_print_time_sec INTEGER,
filament_used_mm REAL,
layer_count INTEGER,
gcode_filaments TEXT,
objects_skip_parts TEXT,
svg_image TEXT
);
CREATE TABLE IF NOT EXISTS print_jobs (
id TEXT PRIMARY KEY,
gcode_file_id TEXT NOT NULL,
printer_id TEXT NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT,
status TEXT NOT NULL,
duration_sec INTEGER,
filament_assignments TEXT,
abort_reason TEXT
);
""")
# Migration: Spalte gcode_filaments nachrüsten falls DB älter
try:
self._conn.execute("ALTER TABLE gcode_files ADD COLUMN gcode_filaments TEXT")
self._conn.commit()
except Exception:
pass
# Migration: Spalten objects_skip_parts + svg_image (Part-Skip-Feature, v0.9.10)
for col, typ in (("objects_skip_parts", "TEXT"), ("svg_image", "TEXT")):
try:
self._conn.execute(f"ALTER TABLE gcode_files ADD COLUMN {col} {typ}")
self._conn.commit()
except Exception:
pass
def save_file(self, file_id: str, filename: str, data: bytes,
est_time_sec: int = 0, thumbnail_b64: str = "",
gcode_filaments: list | None = None) -> str:
"""Speichert GCode-Datei auf Disk und in DB. Gibt Pfad zurück."""
safe_name = os.path.basename(filename)
path = os.path.join(self._gcode_dir, safe_name)
with open(path, "wb") as f:
f.write(data)
now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with self._lock:
filaments_json = json.dumps(gcode_filaments) if gcode_filaments else None
self._conn.execute(
"""INSERT OR REPLACE INTO gcode_files
(id, filename, path, size_bytes, uploaded_at, thumbnail_b64, est_print_time_sec, gcode_filaments)
VALUES (?,?,?,?,?,?,?,?)""",
(file_id, filename, path, len(data), now, thumbnail_b64 or None, est_time_sec or None, filaments_json)
)
self._conn.commit()
return path
def list_files(self) -> list:
with self._lock:
rows = self._conn.execute(
"SELECT * FROM gcode_files ORDER BY uploaded_at DESC"
).fetchall()
return [dict(r) for r in rows]
def get_file(self, file_id: str) -> dict | None:
with self._lock:
row = self._conn.execute(
"SELECT * FROM gcode_files WHERE id=?", (file_id,)
).fetchone()
return dict(row) if row else None
def get_file_by_name(self, filename: str) -> dict | None:
with self._lock:
row = self._conn.execute(
"SELECT * FROM gcode_files WHERE filename=? ORDER BY uploaded_at DESC LIMIT 1",
(filename,)
).fetchone()
return dict(row) if row else None
def update_file_objects(self, filename: str, objects: list, svg: str = "") -> None:
"""Speichert Objekt-Liste + optionales SVG zu einer Datei (matcht via filename)."""
if not filename:
return
with self._lock:
self._conn.execute(
"UPDATE gcode_files SET objects_skip_parts=?, svg_image=? "
"WHERE filename=?",
(json.dumps(objects), svg or "", filename),
)
self._conn.commit()
def delete_file(self, file_id: str) -> bool:
row = self.get_file(file_id)
if not row:
return False
try:
os.remove(row["path"])
except OSError:
pass
with self._lock:
self._conn.execute("DELETE FROM gcode_files WHERE id=?", (file_id,))
self._conn.commit()
return True
def start_job(self, gcode_file_id: str, printer_id: str,
filament_assignments: list | None = None) -> str:
job_id = str(uuid.uuid4())
now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
assignments_json = json.dumps(filament_assignments) if filament_assignments else None
with self._lock:
self._conn.execute(
"""INSERT INTO print_jobs
(id, gcode_file_id, printer_id, started_at, status, filament_assignments)
VALUES (?,?,?,?,'printing',?)""",
(job_id, gcode_file_id, printer_id, now, assignments_json)
)
self._conn.commit()
return job_id
def finish_job(self, job_id: str, status: str = "completed",
abort_reason: str = "") -> None:
now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with self._lock:
row = self._conn.execute(
"SELECT started_at FROM print_jobs WHERE id=?", (job_id,)
).fetchone()
duration = None
if row:
try:
import calendar
start = time.strptime(row["started_at"], "%Y-%m-%dT%H:%M:%SZ")
duration = int(time.time() - calendar.timegm(start))
except Exception:
pass
self._conn.execute(
"""UPDATE print_jobs SET ended_at=?, status=?, duration_sec=?, abort_reason=?
WHERE id=?""",
(now, status, duration, abort_reason or None, job_id)
)
self._conn.commit()
def list_jobs(self, limit: int = 50, offset: int = 0) -> list:
with self._lock:
rows = self._conn.execute(
"""SELECT j.*, f.filename, f.thumbnail_b64
FROM print_jobs j
LEFT JOIN gcode_files f ON j.gcode_file_id = f.id
ORDER BY j.started_at DESC LIMIT ? OFFSET ?""",
(limit, offset)
).fetchall()
return [dict(r) for r in rows]
class KobraXBridge:
def __init__(self, client: KobraXClient, args=None, store=None, printer_id: str = "1", all_bridges=None):
self.client = client
self._args = args
self._printer_id = printer_id
self._all_bridges = all_bridges if all_bridges is not None else {}
self.ws_clients: set[web.WebSocketResponse] = set()
self._last_state: dict = {}
self._state = {
"nozzle_temp": 0.0,
"nozzle_target": 0.0,
"bed_temp": 0.0,
"bed_target": 0.0,
"print_state": "standby",
"kobra_state": "free",
"filename": "",
"slicer_time": 0,
"progress": 0.0,
"print_duration": 0,
"remain_time": 0,
"curr_layer": 0,
"total_layers": 0,
"printer_name": env_loader.get("BRIDGE_PRINTER_NAME", "Anycubic Kobra X"),
"firmware_version": "unknown",
"upload_url": "",
"camera_url": "",
"fan_speed": 0,
"light_on": False,
"light_brightness": 80,
"taskid": "-1",
"print_speed_mode": 2,
"connection_error": "",
"file_ready": "",
}
self._ams_slots: list[dict] = []
self._ams_loaded_slot: int = -1
self._last_uploaded_file: str = ""
self._store = store if store is not None else GCodeStore(args.data_dir)
self._serve_dir_path: str = self._store._gcode_dir
self._current_job_id: str = ""
self._thumbnail_b64: str = ""
# Part-Skip: zuletzt vom Drucker gemeldete Skip-Liste (v0.9.10)
self._skip_state: dict = {"objects": [], "skipped": [], "ts": 0}
# Theme-Name prüfen (keine Sonderzeichen oder Umlaute)
raw_theme = (getattr(args, "ui_theme", None) or "default").strip()
if not _UI_THEME_NAME_RE.match(raw_theme):
log.warning("Ungültiger UI-Theme-Name %r nutze default", raw_theme)
raw_theme = "default"
self._ui_theme = raw_theme
self._index_tpl_cache: str | None = None
self._index_tpl_cache_key: tuple[str, float] | None = None
# Register MQTT push callbacks
client.callbacks["tempature/report"] = self._on_temp
client.callbacks["print/report"] = self._on_print
client.callbacks["info/report"] = self._on_info
client.callbacks["file/report"] = self._on_file
client.callbacks["multiColorBox/report"] = self._on_multicolor_box
client.callbacks["light/report"] = self._on_light
client.callbacks["skip/report"] = self._on_skip
# -------------------------------------------------------------------------
# MQTT callbacks (called from reader thread)
# -------------------------------------------------------------------------
def _on_temp(self, payload: dict):
d = payload.get("data") or {}
self._state["nozzle_temp"] = float(d.get("curr_nozzle_temp", 0))
self._state["nozzle_target"] = float(d.get("target_nozzle_temp", 0))
self._state["bed_temp"] = float(d.get("curr_hotbed_temp", 0))
self._state["bed_target"] = float(d.get("target_hotbed_temp", 0))
self._push_status_update()
def _on_print(self, payload: dict):
d = payload.get("data") or {}
kobra_state = payload.get("state", "")
self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "printing")
if kobra_state:
self._state["kobra_state"] = kobra_state
# Job-History: Druckstart erkennen
if kobra_state == "printing" and not self._current_job_id:
filename = d.get("filename", self._state.get("filename", ""))
if filename:
gf = self._store.get_file_by_name(filename)
if gf:
self._current_job_id = self._store.start_job(
gcode_file_id=gf["id"],
printer_id=self._printer_id,
)
log.info(f"Job gestartet: {self._current_job_id} für {filename}")
# Job-History: Druckende erkennen
if kobra_state in ("finished",) and self._current_job_id:
self._store.finish_job(self._current_job_id, status="completed")
log.info(f"Job abgeschlossen: {self._current_job_id}")
self._current_job_id = ""
elif kobra_state in ("stoped", "canceled") and self._current_job_id:
self._store.finish_job(self._current_job_id, status="cancelled")
log.info(f"Job abgebrochen: {self._current_job_id}")
self._current_job_id = ""
if kobra_state in ("stoped", "canceled"):
self._state["progress"] = 0.0
self._state["filename"] = ""
self._state["file_ready"] = ""
self._state["print_duration"] = 0
self._state["remain_time"] = 0
self._state["slicer_time"] = 0
self._thumbnail_b64 = ""
self._state["filename"] = d.get("filename", self._state["filename"])
if "progress" in d:
self._state["progress"] = float(d["progress"]) / 100.0
if "print_time" in d:
self._state["print_duration"] = int(d["print_time"]) * 60
if "remain_time" in d:
self._state["remain_time"] = int(d["remain_time"]) * 60
if "curr_layer" in d:
self._state["curr_layer"] = d["curr_layer"]
if "total_layers" in d:
self._state["total_layers"] = d["total_layers"]
if "taskid" in d:
self._state["taskid"] = str(d["taskid"])
settings = d.get("settings") or {}
if "print_speed_mode" in settings:
self._state["print_speed_mode"] = int(settings["print_speed_mode"])
self._push_status_update()
def _on_info(self, payload: dict):
d = payload.get("data") or {}
# MQTT-Name nur übernehmen wenn kein eigener Name gesetzt (env oder per-Drucker config)
if not env_loader.get("BRIDGE_PRINTER_NAME") and not getattr(self, "_name_locked", False):
self._state["printer_name"] = d.get("printerName", self._state["printer_name"])
self._state["firmware_version"] = d.get("version", self._state["firmware_version"])
kobra_state = d.get("state", "")
if kobra_state:
self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "standby")
self._state["kobra_state"] = kobra_state
t = d.get("temp") or {}
if t:
self._state["nozzle_temp"] = float(t.get("curr_nozzle_temp", 0))
self._state["nozzle_target"] = float(t.get("target_nozzle_temp", 0))
self._state["bed_temp"] = float(t.get("curr_hotbed_temp", 0))
self._state["bed_target"] = float(t.get("target_hotbed_temp", 0))
urls = d.get("urls") or {}
if urls.get("fileUploadurl"):
self._state["upload_url"] = urls["fileUploadurl"]
if urls.get("rtspUrl"):
self._state["camera_url"] = urls["rtspUrl"]
fan = d.get("fan_speed_pct")
if fan is not None:
self._state["fan_speed"] = int(fan)
speed_mode = d.get("print_speed_mode")
if speed_mode is not None:
self._state["print_speed_mode"] = int(speed_mode)
self._push_status_update()
def _on_skip(self, payload: dict):
"""skip/report-Callback (Part-Skip-Feature, v0.9.10).
Drucker meldet hier IMMER die Liste der bereits geskippten Objekte
zurück (objects_skip_parts), egal ob auf query_obj oder nach skip/start.
Die Gesamt-Objektliste kommt aus file/report.
"""
d = payload.get("data") or {}
skipped = d.get("objects_skip_parts") or d.get("skipped") or d.get("skipped_parts") or []
# Liste immer (auch leer) übernehmen sonst bleibt sie auf alten Stand
self._skip_state = {
"skipped": list(skipped),
"ts": int(time.time()),
}
if payload.get("state") == "done" or payload.get("code") == 200:
log.info(f"Skip-Antwort: state={payload.get('state')} code={payload.get('code')} skipped={skipped}")
def _on_file(self, payload: dict):
d = payload.get("data") or {}
details = d.get("file_details") or {}
thumb = details.get("thumbnail") or details.get("png_image") or ""
if thumb:
self._thumbnail_b64 = thumb
log.info(f"Vorschaubild empfangen: {len(thumb)} Zeichen base64")
# Part-Skip: Objekt-Liste + optionales SVG (v0.9.10)
objs = details.get("objects_skip_parts") or []
svg = details.get("svg_image") or ""
if objs:
filename = d.get("filename") or details.get("filename") or self._last_uploaded_file
if filename:
try:
self._store.update_file_objects(filename, objs, svg)
log.info(f"Skip-Objekte für {filename}: {len(objs)} ({'mit SVG' if svg else 'ohne SVG'})")
except Exception as e:
log.warning(f"update_file_objects fehlgeschlagen: {e}")
self._push_status_update()
def _on_multicolor_box(self, payload: dict):
boxes = (payload.get("data") or {}).get("multi_color_box") or []
if not boxes:
return
box = boxes[0]
slots = box.get("slots") or []
loaded = box.get("loaded_slot", -1)
if loaded != -1:
self._ams_loaded_slot = loaded
# Tip-Forming: nach Einziehen (status=10) oder Ausziehen (status=11)
# schickt der originale Slicer automatisch type=3 (Extruder-Rückzug)
fs = box.get("feed_status") or {}
current_status = fs.get("current_status")
slot_index = fs.get("slot_index", 0)
if current_status in (10, 11):
import threading
def _tip_form():
import time; time.sleep(2)
self.client.publish(
"multiColorBox", "feedFilament",
{"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": 3}}]},
timeout=0
)
log.info(f"Tip-Forming (type=3) nach status={current_status} slot={slot_index}")
threading.Thread(target=_tip_form, daemon=True).start()
if slots:
self._ams_slots = slots
log.info(f"AMS-Slots empfangen: {len(slots)}, loaded_slot={self._ams_loaded_slot}")
self._push_status_update()
def _on_light(self, payload: dict):
d = payload.get("data") or {}
self._state["light_on"] = bool(d.get("status", 0))
self._state["light_brightness"] = int(d.get("brightness", 80))
self._push_status_update()
# OrcaSlicer filament preset IDs (MoonrakerPrinterAgent.cpp mapping)
_TRAY_INFO_IDX = {
"PLA": "OGFL99", "PLA-CF": "OGFL98", "PLA SILK": "OGFL96",
"PETG": "OGFG99", "PETG-CF": "OGFG98",
"ABS": "OGFB99", "ASA": "OGFB98",
"TPU": "OGFT99", "PA": "OGFP99", "PA-CF": "OGFP98",
"PC": "OGFC99", "HIPS": "OGFH99", "PVA": "OGFV99",
}
def _build_lane_data(self) -> dict:
"""Baut BBL-AMS-JSON für OrcaSlicer DevFilaSystemParser::ParseV1_0."""
slots = self._ams_slots
total = len(slots)
if total == 0:
return {"ams": [], "ams_exist_bits": "0", "tray_exist_bits": "0"}
ams_count = (total + 3) // 4
ams_exist_bits = 0
tray_exist_bits = 0
ams_array = []
for ams_id in range(ams_count):
ams_exist_bits |= (1 << ams_id)
tray_array = []
max_slot = min(3, total - ams_id * 4 - 1)
for slot_id in range(max_slot + 1):
slot_index = ams_id * 4 + slot_id
slot = slots[slot_index] if slot_index < total else {}
occupied = slot.get("status") == 5
if occupied:
tray_exist_bits |= (1 << slot_index)
color_raw = slot.get("color", [255, 255, 255])
if isinstance(color_raw, list) and len(color_raw) >= 3:
color_hex = "{:02X}{:02X}{:02X}FF".format(
int(color_raw[0]), int(color_raw[1]), int(color_raw[2])
)
elif isinstance(color_raw, str) and len(color_raw) >= 6:
color_hex = color_raw[:6].upper() + "FF"
else:
color_hex = "FFFFFFFF"
material = slot.get("type", "PLA").upper()
tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99")
tray_array.append({
"id": str(slot_id),
"tag_uid": "0000000000000000",
"tray_info_idx": tray_info_idx,
"tray_type": material,
"tray_color": color_hex,
})
else:
tray_array.append({
"id": str(slot_id),
"tag_uid": "0000000000000000",
"tray_info_idx": "",
"tray_type": "",
"tray_color": "00000000",
"tray_slot_placeholder": "1",
})
ams_array.append({"id": str(ams_id), "info": "0002", "tray": tray_array})
return {
"ams": ams_array,
"ams_exist_bits": format(ams_exist_bits, "X"),
"tray_exist_bits": format(tray_exist_bits, "X"),
}
# -------------------------------------------------------------------------
# WebSocket push
# -------------------------------------------------------------------------
def _push_status_update(self):
if not self.ws_clients:
return
msg = {
"jsonrpc": "2.0",
"method": "notify_status_update",
"params": [self._build_printer_objects(), time.time()],
}
text = json.dumps(msg)
dead = set()
for ws in self.ws_clients:
try:
asyncio.run_coroutine_threadsafe(ws.send_str(text), ws._loop)
except Exception:
dead.add(ws)
self.ws_clients -= dead
def _build_mmu_object(self) -> dict:
slots = self._ams_slots
if not slots:
return {}
_TEMP = {"PLA": 210, "PETG": 230, "ABS": 240, "ASA": 250,
"TPU": 220, "PA": 260, "PC": 270, "HIPS": 220}
num_gates = len(slots)
gate_status, gate_material, gate_color, gate_temperature, gate_color_rgb = [], [], [], [], []
for slot in slots:
occupied = slot.get("status") == 5
gate_status.append(1 if occupied else 0)
material = slot.get("type", "PLA").upper() if occupied else ""
gate_material.append(material)
c = slot.get("color", [0, 0, 0])
if occupied:
gate_color.append("#{:02X}{:02X}{:02X}".format(*c[:3]))
gate_color_rgb.append([round(c[0]/255, 3), round(c[1]/255, 3), round(c[2]/255, 3)])
else:
gate_color.append("")
gate_color_rgb.append([0.0, 0.0, 0.0])
gate_temperature.append(_TEMP.get(material, 210) if occupied else 0)
return {
"num_gates": num_gates,
"enabled": True,
"gate_status": gate_status,
"gate_material": gate_material,
"gate_color": gate_color,
"gate_temperature": gate_temperature,
"gate_color_rgb": gate_color_rgb,
"gate_filament_name": [""] * num_gates,
"gate_spool_id": [-1] * num_gates,
"ttg_map": list(range(num_gates)),
"tool": max(self._ams_loaded_slot, 0),
"gate": max(self._ams_loaded_slot, 0),
}
def _build_printer_objects(self) -> dict:
s = self._state
return {
"extruder": {
"temperature": s["nozzle_temp"],
"target": s["nozzle_target"],
"power": 0.0,
},
"heater_bed": {
"temperature": s["bed_temp"],
"target": s["bed_target"],
"power": 0.0,
},
"print_stats": {
"state": s["print_state"],
"filename": s["filename"],
"print_duration": s["print_duration"],
"total_duration": s["print_duration"],
"remain_time": s["remain_time"],
"info": {
"current_layer": s["curr_layer"],
"total_layer": s["total_layers"],
},
},
"display_status": {
"progress": s["progress"],
"message": "",
},
"virtual_sdcard": {
"progress": s["progress"],
"is_active": s["print_state"] == "printing",
"file_path": s["filename"],
},
"toolhead": {
"position": [0, 0, 0, 0],
"homed_axes": "xyz",
"print_time": s["print_duration"],
"estimated_print_time": s["print_duration"],
},
"mmu": self._build_mmu_object(),
}
# -------------------------------------------------------------------------
# /kx/ API handlers (GCode Store, History, Filament)
# -------------------------------------------------------------------------
_CORS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
}
def _json_cors(self, data, status=200):
return web.json_response(data, status=status, headers=self._CORS)
async def handle_kx_options(self, request):
return web.Response(status=204, headers=self._CORS)
async def handle_kx_files(self, request):
files = self._store.list_files()
# Letzten Job-Status + Dauer pro Datei ergänzen
jobs = self._store.list_jobs(limit=500)
last_job: dict = {}
for j in reversed(jobs):
last_job[j["gcode_file_id"]] = j
for f in files:
lj = last_job.get(f["id"])
f["last_print_status"] = lj["status"] if lj else None
f["last_print_duration"] = lj["duration_sec"] if lj else None
f["last_print_at"] = lj["started_at"] if lj else None
return self._json_cors({"result": files})
async def handle_kx_file_delete(self, request):
file_id = request.match_info["file_id"]
if self._store.delete_file(file_id):
return self._json_cors({"result": "ok"})
return self._json_cors({"error": "not found"}, status=404)
async def handle_kx_filament_slots(self, request):
slots = []
for i, s in enumerate(self._ams_slots):
slots.append({
"slot_index": i,
"material": s.get("type", ""),
"color_hex": "#{:02X}{:02X}{:02X}".format(*s.get("color", [0,0,0])[:3]),
"status": "loaded" if s.get("status") == 5 else "empty",
"nozzle_temp": 0,
})
return self._json_cors({"result": slots})
async def handle_kx_history(self, request):
limit = int(request.rel_url.query.get("limit", 50))
offset = int(request.rel_url.query.get("offset", 0))
jobs = self._store.list_jobs(limit=limit, offset=offset)
return self._json_cors({"result": jobs})
async def handle_kx_file_objects(self, request):
"""Liefert die Objekt-Liste + optionales SVG für eine Datei.
GET /kx/files/{id}/objects → {"names": [...], "svg_b64": "..."}
Wenn Datei noch keine Objekte hat (alter Eintrag): file/fileDetails
beim Drucker abfragen und Antwort abwarten ist Aufgabe des Frontends
(Reload nach Upload). Hier nur Datenbankstand zurückgeben.
"""
fid = request.match_info.get("id", "")
f = self._store.get_file(fid)
if not f:
return self._json_cors({"error": "file not found"}, status=404)
try:
names = json.loads(f.get("objects_skip_parts") or "[]")
except Exception:
names = []
return self._json_cors({
"result": {
"names": names,
"svg_b64": f.get("svg_image") or "",
}
})
async def handle_kx_skip(self, request):
"""Mid-Print Skip auslösen.
POST /kx/skip body={"names": ["..", ".."]}
"""
try:
body = await request.json()
except Exception:
return self._json_cors({"error": "invalid json"}, status=400)
names = body.get("names") or []
if not isinstance(names, list) or not all(isinstance(n, str) for n in names):
return self._json_cors({"error": "names must be list[str]"}, status=400)
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.skip_objects(names))
except Exception as e:
return self._json_cors({"error": str(e)}, status=502)
return self._json_cors({"result": "ok", "names": names})
async def handle_kx_skip_query(self, request):
"""Druck-Objektliste vom Drucker neu abfragen.
POST /kx/skip/query → triggert skip/query_obj, gibt zuletzt bekannten
Stand zurück (skip/report kommt async, Frontend pollt /kx/skip/state).
"""
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.query_skip_objects())
except Exception as e:
return self._json_cors({"error": str(e)}, status=502)
return self._json_cors({"result": self._skip_state})
async def handle_kx_skip_state(self, request):
"""Aktueller Skip-State.
Kombiniert:
- Gesamt-Objektliste: aus dem GCode-Store, gematcht über den aktuell
laufenden filename (file/report beim Druckstart hat die Liste gefüllt).
skip/query_obj liefert nämlich NUR die bereits geskippten zurück,
nicht die Gesamtliste.
- Geskippt: aus self._skip_state (von skip/report aktualisiert).
"""
filename = self._state.get("filename", "")
all_objects: list[str] = []
svg = ""
if filename:
try:
f = self._store.get_file_by_name(filename)
if f:
all_objects = json.loads(f.get("objects_skip_parts") or "[]")
svg = f.get("svg_image") or ""
except Exception as e:
log.warning(f"skip_state lookup failed: {e}")
result = {
"objects": all_objects,
"skipped": list(self._skip_state.get("skipped", [])),
"svg_b64": svg,
"ts": self._skip_state.get("ts", 0),
"filename": filename,
}
return self._json_cors({"result": result})
async def handle_kx_printers(self, request):
# Aktive Drucker (mit IP) sammeln
active = [(pid, br) for pid, br in self._all_bridges.items()
if (br._args.printer_ip or "").strip()]
# Host für bridge_url: Browser-Sicht beibehalten, aber niemals "localhost" exportieren
# sonst scheitern Fetches aus dem Browser, wenn die UI über die LAN-IP geöffnet ist.
host = request.host.split(":")[0]
if host in ("localhost", "127.0.0.1", "::1", "0.0.0.0"):
host = ""
out = []
for pid, br in active:
port = getattr(br._args, "port", 7125)
# Nur bei Multi-Printer eine konkrete bridge_url setzen (Cross-Instance-Fetch).
# Single-Printer: leere bridge_url → JS nutzt relative Pfade (gleiche Origin wie UI).
bridge_url = ""
if len(active) > 1 and host:
bridge_url = f"http://{host}:{port}"
out.append({
"id": pid,
"name": br._state.get("printer_name") or f"Drucker {pid}",
"bridge_url": bridge_url,
"printer_ip": br._args.printer_ip,
"device_id": br._args.device_id or "",
})
return self._json_cors({"result": out})
async def handle_kx_print(self, request):
"""Druckstart aus dem GCode-Store mit optionalen Filament-Assignments."""
try:
body = await request.json()
except Exception:
return self._json_cors({"error": "invalid json"}, status=400)
file_id = body.get("file_id")
if not file_id:
return self._json_cors({"error": "file_id required"}, status=400)
gcode_file = self._store.get_file(file_id)
if not gcode_file:
return self._json_cors({"error": "file not found"}, status=404)
# filament_assignments: [{slot_index, material, color_hex}, …]
assignments = body.get("filament_assignments")
# excluded_objects: ["name1","name2",...] Pre-Print Skip (v0.9.10)
excluded_objects = body.get("excluded_objects") or []
if not isinstance(excluded_objects, list):
excluded_objects = []
if assignments:
ams_box_mapping = [
{
"paint_index": a.get("paint_index", i),
"ams_index": a["slot_index"],
"paint_color": a.get("paint_color", [255, 255, 255, 255]),
"ams_color": a.get("ams_color", [255, 255, 255, 255]),
"material_type": a.get("material", "PLA"),
}
for i, a in enumerate(assignments)
]
else:
# Kein Dialog → alle belegten Slots wie bei normalem Upload-Druck
default_slot = getattr(self._args, "default_ams_slot", "auto")
all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5]
if default_slot != "auto":
try:
slot_idx = int(default_slot)
loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded
except ValueError:
loaded = all_loaded
else:
loaded = all_loaded
ams_box_mapping = [
{
"paint_index": i,
"ams_index": i,
"paint_color": [255, 255, 255, 255],
"ams_color": [255, 255, 255, 255],
"material_type": s.get("type", "PLA"),
}
for i, s in loaded
]
use_ams = len(ams_box_mapping) > 0
auto_leveling = getattr(self._args, "auto_leveling", 1)
filename = gcode_file["filename"]
file_path = gcode_file["path"]
# Datei über internes Serve-Endpoint bereitstellen
url = f"http://localhost:{self._args.port}/serve/{os.path.basename(file_path)}"
payload = {
"taskid": "-1",
"url": url,
"filename": filename,
"md5": "",
"filepath": None,
"filetype": 1,
"project_type": 1,
"filesize": gcode_file.get("size_bytes", 0),
"ams_settings": {
"use_ams": use_ams,
"ams_box_mapping": ams_box_mapping,
},
"task_settings": {
"auto_leveling": auto_leveling,
"vibration_compensation": 0,
"flow_calibration": 0,
"dry_mode": 0,
"ai_settings": {"status": 0, "count": 0, "type": 1},
"timelapse": {"status": 0, "count": 0, "type": 64},
"drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0},
"model_objects_skip_parts": excluded_objects,
},
}
log.info(f"KX-Store Druckstart: {filename} ams={len(ams_box_mapping)} slots assignments={bool(assignments)} excluded={len(excluded_objects)}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start", payload, timeout=15.0)
)
if result is None:
return self._json_cors({"error": "Keine Antwort vom Drucker"}, status=504)
# Job in History starten
self._current_job_id = self._store.start_job(
gcode_file_id=gcode_file["id"],
printer_id=getattr(self._args, "device_id", "unknown"),
filament_assignments=assignments,
)
return self._json_cors({"result": "ok", "filename": filename})
# -------------------------------------------------------------------------
# HTTP handlers
# -------------------------------------------------------------------------
async def handle_server_info(self, request):
return web.json_response({
"result": {
"klippy_connected": True,
"klippy_state": "ready",
"components": ["file_manager", "job_state", "virtual_sdcard"],
"failed_components":[],
"registered_directories": ["gcodes"],
"warnings": [],
"websocket_count": len(self.ws_clients),
"moonraker_version": MOONRAKER_VERSION,
"api_version": [1, 3, 0],
"api_version_string": "1.3.0",
}
})
async def handle_printer_info(self, request):
s = self._state
return web.json_response({
"result": {
"state": "ready",
"state_message": "Printer is ready",
"hostname": "kobrax-bridge",
"klipper_path": "/home/pi/klipper",
"python_path": "/home/pi/klippy-env/bin/python",
"log_file": "/tmp/klippy.log",
"config_file": "/home/pi/printer.cfg",
"software_version": KLIPPER_VERSION,
"cpu_info": s["printer_name"],
}
})
async def handle_machine_system_info(self, request):
return web.json_response({
"result": {
"system_info": {
"cpu_info": {"cpu_count": 4, "bits": "64bit", "processor": "armv7l",
"cpu_desc": "Anycubic Kobra X Bridge", "serial_number": "",
"hardware_desc": "", "model": "Kobra X Bridge",
"total_memory": 524288, "memory_units": "kB"},
"sd_info": {},
"distribution": {"name": "Linux", "id": "linux", "version": "1.0",
"version_parts": {}, "like": "", "codename": ""},
"available_services": [],
"service_state": {},
"python": {"version": list(sys.version_info[:3]), "version_string": sys.version},
"network": {},
"canbus": {},
}
}
})
async def handle_objects_query(self, request):
objects = self._build_printer_objects()
# filter by requested objects if specified
requested = dict(request.rel_url.query)
if requested:
filtered = {k: objects[k] for k in requested if k in objects}
else:
filtered = objects
return web.json_response({"result": {"status": filtered, "eventtime": time.time()}})
async def handle_objects_list(self, request):
return web.json_response({
"result": {
"objects": list(self._build_printer_objects().keys())
}
})
async def handle_objects_subscribe(self, request):
return web.json_response({
"result": {
"status": self._build_printer_objects(),
"eventtime": time.time(),
}
})
async def handle_files_list(self, request):
filename = self._state.get("filename", "")
files = []
if filename:
files.append({
"path": filename,
"modified": time.time(),
"size": 0,
"permissions": "rw",
})
return web.json_response({"result": files})
async def handle_file_upload(self, request):
log.info(f"Upload-Request: {request.method} {request.path_qs} CT={request.headers.get('Content-Type','')[:60]}")
ct = request.headers.get("Content-Type", "")
if "multipart" not in ct:
return web.json_response({"error": "expected multipart"}, status=400)
auto_print = False
reader = await request.multipart()
file_data = None
remote_filename = self._last_uploaded_file or "upload.gcode"
async for part in reader:
if part.name in ("file", "gcode", "upload_file"):
remote_filename = part.filename or remote_filename
file_data = await part.read()
log.info(f"Multipart-Feld '{part.name}': {remote_filename} ({len(file_data)} bytes)")
elif part.name == "path":
val = (await part.read()).decode("utf-8", errors="replace").strip()
if val:
remote_filename = val
elif part.name == "print":
val = (await part.read()).decode("utf-8", errors="replace").strip().lower()
auto_print = val == "true"
else:
log.debug(f"Unbekanntes Multipart-Feld: {part.name}")
if not file_data:
return web.json_response({"error": "no file received"}, status=400)
file_md5 = hashlib.md5(file_data).hexdigest()
file_size = len(file_data)
# Slicer-Zeitschätzung + Thumbnail aus GCode auslesen
est_time = _parse_gcode_estimated_time(file_data)
self._state["slicer_time"] = est_time
thumbnail_b64 = _extract_thumbnail(file_data)
gcode_filaments = _extract_filament_info(file_data)
# Datei persistent im GCode-Store ablegen
self._store.save_file(
file_id=file_md5,
filename=remote_filename,
data=file_data,
est_time_sec=est_time,
thumbnail_b64=thumbnail_b64,
gcode_filaments=gcode_filaments or None,
)
serve_path = os.path.join(self._serve_dir_path, os.path.basename(remote_filename))
del file_data # RAM freigeben
self._last_uploaded_file = remote_filename
log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} → Store + Drucker")
# Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk)
upload_url = self._state.get("upload_url") or None
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None, self.client.upload_gcode, serve_path, remote_filename, upload_url
)
except Exception as e:
log.error(f"Upload fehlgeschlagen: {e}")
return web.json_response({"error": str(e)}, status=500)
log.info(f"Upload erfolgreich: {result}")
# Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size)
serve_url = f"http://{request.host}/serve/{remote_filename}"
# print=true im Multipart-Formular (Moonraker) oder Query-String → Druck starten
# print=false oder fehlt → nur hochladen
if not auto_print:
auto_print = request.rel_url.query.get("print", "false").lower() == "true"
# Thumbnail immer anfordern (Drucker antwortet async mit file/report)
self._thumbnail_b64 = ""
self.client.publish("file", "fileDetails", {"root": "local", "filename": remote_filename}, timeout=0)
self._state["last_upload_url"] = serve_url
self._state["last_upload_md5"] = file_md5
self._state["last_upload_size"] = file_size
if auto_print:
log.info(f"Upload+Print (print=true): {remote_filename}")
self._state["file_ready"] = ""
loop = asyncio.get_event_loop()
loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size))
else:
log.info(f"Nur hochgeladen (print=false): {remote_filename}")
self._state["file_ready"] = remote_filename
# OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus)
return web.json_response({
"done": True,
"files": {
"local": {
"name": remote_filename,
"origin": "local",
"path": remote_filename,
"refs": {
"download": f"http://{request.host}/api/files/local/{remote_filename}",
"resource": f"http://{request.host}/api/files/local/{remote_filename}",
}
}
},
"result": {
"item": {"path": remote_filename, "root": "gcodes"},
"action": "create_file",
}
}, status=201)
def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0):
self._state["file_ready"] = ""
default_slot = getattr(self._args, "default_ams_slot", "auto")
all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5]
if default_slot != "auto":
try:
slot_idx = int(default_slot)
loaded = [(i, s) for i, s in all_loaded if i == slot_idx]
if not loaded:
log.warning(f"Standard-Slot {slot_idx} ist leer fallback auf Auto")
loaded = all_loaded
except ValueError:
loaded = all_loaded
else:
loaded = all_loaded
use_ams = len(loaded) > 0
ams_box_mapping = [
{
"paint_index": i,
"ams_index": i,
"paint_color": [255, 255, 255, 255],
"ams_color": [255, 255, 255, 255],
"material_type": s.get("type", "PLA"),
}
for i, s in loaded
]
log.info(f"AMS-Slots: {len(loaded)}/{len(self._ams_slots)} belegt → {[i for i,_ in loaded]}")
auto_leveling = getattr(self._args, "auto_leveling", 1)
payload = {
"taskid": "-1",
"url": url,
"filename": filename,
"md5": md5,
"filepath": None,
"filetype": 1,
"project_type": 1,
"filesize": filesize,
"ams_settings": {
"use_ams": use_ams,
"ams_box_mapping": ams_box_mapping,
},
"task_settings": {
"auto_leveling": auto_leveling,
"vibration_compensation": 0,
"flow_calibration": 0,
"dry_mode": 0,
"ai_settings": {"status": 0, "count": 0, "type": 1},
"timelapse": {"status": 0, "count": 0, "type": 64},
"drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0},
"model_objects_skip_parts": [],
},
}
log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots")
result = self.client.publish("print", "start", payload, timeout=15.0)
if result:
log.info(f"Druckstart bestätigt: state={result.get('state')}")
else:
log.warning("Druckstart: keine Antwort vom Drucker")
async def handle_print_start(self, request):
try:
body = await request.json()
except Exception:
body = {}
filename = (request.rel_url.query.get("filename")
or body.get("filename")
or self._last_uploaded_file)
if not filename:
return web.json_response({"error": "no filename"}, status=400)
log.info(f"Druck starten: {filename}")
# Optionale Slot-Auswahl aus dem Filament-Dialog
filament_assignments = body.get("filament_assignments")
# Pre-Print Skip (v0.9.10)
excluded_objects = body.get("excluded_objects") or []
if not isinstance(excluded_objects, list):
excluded_objects = []
if filament_assignments is not None:
ams_box_mapping = [
{
"paint_index": a.get("paint_index", i),
"ams_index": a["slot_index"],
"paint_color": a.get("paint_color", [255, 255, 255, 255]),
"ams_color": a.get("ams_color", [255, 255, 255, 255]),
"material_type": a.get("material", "PLA"),
}
for i, a in enumerate(filament_assignments)
]
else:
# AMS-Mapping aus gecachtem State — leere Slots (status != 5) überspringen
default_slot = getattr(self._args, "default_ams_slot", "auto")
all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5]
if default_slot != "auto":
try:
slot_idx = int(default_slot)
loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded
except ValueError:
loaded = all_loaded
else:
loaded = all_loaded
ams_box_mapping = [
{
"paint_index": i,
"ams_index": i,
"paint_color": [255, 255, 255, 255],
"ams_color": [255, 255, 255, 255],
"material_type": s.get("type", "PLA"),
}
for i, s in loaded
]
use_ams = len(ams_box_mapping) > 0
auto_leveling = getattr(self._args, "auto_leveling", 1)
url = self._state.get("last_upload_url", "")
filesize = self._state.get("last_upload_size", 0)
md5 = self._state.get("last_upload_md5", "")
payload = {
"taskid": "-1",
"url": url,
"filename": filename,
"md5": md5,
"filepath": None,
"filetype": 1,
"project_type": 1,
"filesize": filesize,
"ams_settings": {
"use_ams": use_ams,
"ams_box_mapping": ams_box_mapping,
},
"task_settings": {
"auto_leveling": auto_leveling,
"vibration_compensation": 0,
"flow_calibration": 0,
"dry_mode": 0,
"ai_settings": {"status": 0, "count": 0, "type": 0},
"timelapse": {"status": 0, "count": 0, "type": 0},
"drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0},
"model_objects_skip_parts": excluded_objects,
},
}
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start", payload, timeout=15.0)
)
if result is None:
return web.json_response({"error": "Keine Antwort vom Drucker"}, status=504)
return web.json_response({"result": "ok"})
async def handle_print_pause(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.pause_print(taskid))
return web.json_response({"result": "ok"})
async def handle_print_resume(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.resume_print(taskid))
return web.json_response({"result": "ok"})
async def handle_print_cancel(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.stop_print(taskid))
return web.json_response({"result": "ok"})
async def handle_api_file_ready_clear(self, request):
self._state["file_ready"] = ""
self._thumbnail_b64 = ""
self._push_status_update()
return web.json_response({"result": "ok"})
async def handle_octoprint_version(self, request):
return web.json_response({
"api": "0.1",
"server": "1.9.0",
"text": "OctoPrint (Kobra X Bridge)",
})
def _theme_index_path(self) -> str:
return os.path.join(_BASE, "web", "themes", self._ui_theme, "index.html")
def _load_index_template_cached(self) -> str:
path = self._theme_index_path()
mtime = os.path.getmtime(path)
key = (path, mtime)
if self._index_tpl_cache is not None and self._index_tpl_cache_key == key:
return self._index_tpl_cache
with open(path, "r", encoding="utf-8") as f:
self._index_tpl_cache = f.read()
self._index_tpl_cache_key = key
return self._index_tpl_cache
def _ui_asset_cache_buster(self) -> str:
base = os.path.join(_BASE, "web", "themes", self._ui_theme)
mt = 0.0
for fn in ("index.html", "style.css", "app.js"):
try:
mt = max(mt, os.path.getmtime(os.path.join(base, fn)))
except OSError:
pass
return str(int(mt)) if mt else "0"
async def handle_kx_ui_asset(self, request):
name = request.match_info.get("name", "")
ctype = _KX_UI_ASSETS.get(name)
if ctype is None:
raise web.HTTPNotFound()
path = os.path.join(_BASE, "web", "themes", self._ui_theme, name)
try:
raw = pathlib.Path(path).read_text(encoding="utf-8")
except OSError:
raise web.HTTPNotFound()
if name == "app.js":
raw = raw.replace("'__VERSION__'", f"'{self._read_version()}'")
return web.Response(
text=raw,
content_type=ctype,
headers={"Cache-Control": "public, max-age=86400"},
)
async def handle_index(self, request):
try:
tpl = self._load_index_template_cached()
except OSError:
p = self._theme_index_path()
log.error("Web-UI Theme-Datei fehlt oder nicht lesbar: %s (Theme: %s)", p, self._ui_theme)
return web.Response(
text="<pre>KX-Bridge: index.html nicht gefunden.\nErwartet:\n"
+ html.escape(p, quote=True)
+ "</pre>",
status=500,
content_type="text/html; charset=utf-8",
)
html = tpl.replace("__UI_ASSETS_VER__", self._ui_asset_cache_buster())
return web.Response(text=html, content_type="text/html",
headers={"Cache-Control": "no-store, no-cache, must-revalidate"})
async def handle_api_light(self, request):
try:
body = await request.json()
except Exception:
body = {}
on = bool(body.get("on", True))
brightness = int(body.get("brightness", self._state["light_brightness"]))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"light", "control",
{"type": 3, "status": 1 if on else 0, "brightness": brightness},
timeout=0
))
self._state["light_on"] = on
self._state["light_brightness"] = brightness
return web.json_response({"result": "ok"})
async def handle_api_fan(self, request):
try:
body = await request.json()
except Exception:
body = {}
speed = int(body.get("speed", 0))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"fan", "setSpeed", {"fan_speed_pct": speed}, timeout=0
))
self._state["fan_speed"] = speed
return web.json_response({"result": "ok"})
async def handle_api_connect(self, request):
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.client.connect)
self._state["print_state"] = "standby"
self._state["kobra_state"] = "free"
log.info("Manuell verbunden")
return web.json_response({"result": "connected"})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def handle_api_disconnect(self, request):
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.client.disconnect)
except Exception:
pass
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
log.info("Manuell getrennt")
return web.json_response({"result": "disconnected"})
async def handle_api_speed(self, request):
try:
body = await request.json()
except Exception:
body = {}
mode = int(body.get("mode", 2))
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"print_speed_mode": mode}},
))
self._state["print_speed_mode"] = mode
return web.json_response({"result": "ok"})
async def handle_api_ams_set_slot(self, request):
try:
body = await request.json()
except Exception:
body = {}
index = int(body.get("index", 0))
mat = str(body.get("type", "PLA")).upper()
color = body.get("color", [255, 255, 255])
if not (isinstance(color, list) and len(color) == 3):
return web.json_response({"error": "color must be [r,g,b]"}, status=400)
loop = asyncio.get_event_loop()
def _send():
resp = self.client.publish(
"multiColorBox", "setInfo",
{"multi_color_box": [{"id": -1, "slots": [{"index": index, "type": mat, "color": color}]}]},
timeout=5
)
log.info(f"setInfo slot={index} type={mat} color={color}{resp}")
return resp
resp = await loop.run_in_executor(None, _send)
if resp and resp.get("code") == 200:
# Update cached slot immediately
for s in self._ams_slots:
if s.get("index") == index:
s["type"] = mat
s["color"] = color
break
return web.json_response({"result": "ok"})
async def handle_api_ams_feed(self, request):
try:
body = await request.json()
except Exception:
body = {}
slot_index = int(body.get("slot_index", 0))
feed_type = int(body.get("type", 1))
# Ausziehen (type=2): wenn kein Slot explizit gewählt, den zuletzt geladenen nehmen
if feed_type == 2 and self._ams_loaded_slot >= 0:
slot_index = self._ams_loaded_slot
loop = asyncio.get_event_loop()
def _send():
resp = self.client.publish(
"multiColorBox", "feedFilament",
{"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": feed_type}}]},
timeout=5
)
log.info(f"feedFilament type={feed_type} slot={slot_index} loaded_slot={self._ams_loaded_slot}{resp}")
await loop.run_in_executor(None, _send)
return web.json_response({"result": "ok"})
async def handle_api_axis(self, request):
try:
body = await request.json()
except Exception:
body = {}
action = body.get("action", "move")
loop = asyncio.get_event_loop()
if action == "turnOff":
await loop.run_in_executor(None, lambda: self.client.publish(
"axis", "turnOff", None, timeout=0
))
else:
axis = int(body.get("axis", 4))
move_type = int(body.get("move_type", 2))
distance = float(body.get("distance", 0))
await loop.run_in_executor(None, lambda: self.client.publish(
"axis", "move",
{"axis": axis, "move_type": move_type, "distance": distance},
timeout=0
))
return web.json_response({"result": "ok"})
async def handle_api_temperature(self, request):
try:
body = await request.json()
except Exception:
body = {}
nozzle = body.get("nozzle")
bed = body.get("bed")
loop = asyncio.get_event_loop()
printing = self._state.get("print_state") == "printing"
if printing:
# During print: runtime update via web/printer topic, one setting at a time
taskid = self._state.get("taskid", "-1")
if nozzle is not None:
n = int(float(nozzle))
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"target_nozzle_temp": n}},
))
if bed is not None:
b = int(float(bed))
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"target_hotbed_temp": b}},
))
else:
# Idle: standard tempature/set with both values
n = int(float(nozzle)) if nozzle is not None else int(self._state["nozzle_target"])
b = int(float(bed)) if bed is not None else int(self._state["bed_target"])
await loop.run_in_executor(None, lambda: self.client.publish(
"tempature", "set",
{"target_nozzle_temp": n, "target_hotbed_temp": b},
timeout=0
))
return web.json_response({"result": "ok"})
async def handle_api_camera(self, request):
return web.json_response({"url": self._state["camera_url"]})
async def handle_api_camera_start(self, request):
loop = asyncio.get_event_loop()
# Wait for pushStarted confirmation before returning
result = await loop.run_in_executor(None, lambda: self.client.publish(
"video", "startCapture", None, timeout=8.0
))
state = (result or {}).get("state", "")
log.info(f"Kamera startCapture: state={state}")
return web.json_response({"result": "ok", "state": state})
async def handle_api_camera_stop(self, request):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"video", "stopCapture", None, timeout=0
))
return web.json_response({"result": "ok"})
async def handle_api_camera_snapshot(self, request):
"""Einzelner JPEG-Frame aus dem Kamera-Stream für Obico und andere Snapshot-Clients."""
url = self._state.get("camera_url", "")
if not url:
return web.Response(status=503, text="Keine Kamera-URL bekannt")
is_rtsp = url.lower().startswith("rtsp://")
input_args = ["-fflags", "nobuffer", "-flags", "low_delay"]
if is_rtsp:
input_args += ["-probesize", "32", "-analyzeduration", "0", "-rtsp_transport", "tcp"]
else:
input_args += ["-probesize", "1000000", "-analyzeduration", "1000000"]
try:
proc = await asyncio.create_subprocess_exec(
_find_ffmpeg(), "-loglevel", "quiet",
*input_args, "-i", url,
"-frames:v", "1", "-f", "mjpeg", "-q:v", "3",
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
jpeg, _ = await asyncio.wait_for(proc.communicate(), timeout=20)
except asyncio.TimeoutError:
return web.Response(status=504, text="Snapshot-Timeout")
except Exception as e:
return web.Response(status=503, text=str(e))
if not jpeg:
return web.Response(status=503, text="Kein Frame empfangen")
return web.Response(body=jpeg, content_type="image/jpeg",
headers={"Cache-Control": "no-cache"})
async def handle_camera_stream(self, request):
"""MJPEG proxy: FLV → MJPEG via ffmpeg, served as multipart/x-mixed-replace."""
url = self._state.get("camera_url", "")
if not url:
return web.Response(status=503, text="Keine Kamera-URL bekannt")
is_rtsp = url.lower().startswith("rtsp://")
ffmpeg_input_args = [
"-fflags", "nobuffer",
"-flags", "low_delay",
]
if is_rtsp:
ffmpeg_input_args += ["-probesize", "32", "-analyzeduration", "0", "-rtsp_transport", "tcp"]
else:
ffmpeg_input_args += ["-probesize", "1000000", "-analyzeduration", "1000000"]
# ffmpeg erst starten BEVOR der StreamResponse geöffnet wird
# (damit wir bei Fehler noch eine normale HTTP-Response senden können)
try:
proc = await asyncio.create_subprocess_exec(
_find_ffmpeg(), "-loglevel", "quiet",
*ffmpeg_input_args,
"-i", url,
"-vf", "fps=15,scale=640:-1",
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-q:v", "3",
"-flush_packets", "1",
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
except (FileNotFoundError, OSError) as e:
log.warning("Kamera: ffmpeg nicht gefunden Kamerastream nicht verfügbar")
return web.Response(status=503, text="ffmpeg not found")
except Exception as e:
log.warning(f"Kamera: ffmpeg konnte nicht gestartet werden: {e}")
return web.Response(status=503, text=str(e))
boundary = "kobraxframe"
resp = web.StreamResponse(headers={
"Content-Type": f"multipart/x-mixed-replace;boundary={boundary}",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
})
await resp.prepare(request)
buf = b""
try:
while True:
chunk = await proc.stdout.read(65536)
if not chunk:
break
buf += chunk
# Extract complete JPEG frames (SOI=FFD8, EOI=FFD9)
while True:
start = buf.find(b"\xff\xd8")
if start == -1:
buf = b""
break
end = buf.find(b"\xff\xd9", start + 2)
if end == -1:
buf = buf[start:]
break
frame = buf[start:end + 2]
buf = buf[end + 2:]
header = (
f"--{boundary}\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {len(frame)}\r\n\r\n"
).encode()
try:
await resp.write(header + frame + b"\r\n")
except Exception:
return resp
except Exception as e:
log.warning(f"Kamera-Stream unterbrochen: {e}")
finally:
try:
proc.kill()
except Exception:
pass
return resp
async def handle_serve_file(self, request):
"""Liefert hochgeladene G-Code-Dateien vom Temp-Verzeichnis (für Drucker-Download)."""
filename = os.path.basename(request.match_info.get("filename", ""))
serve_path = os.path.join(self._serve_dir_path, filename)
if not os.path.isfile(serve_path):
return web.Response(status=404, text="not found")
size = os.path.getsize(serve_path)
log.info(f"Drucker lädt Datei ab: {filename} ({size} bytes)")
return web.FileResponse(serve_path, headers={
"Content-Disposition": f'attachment; filename="{filename}"'
})
async def handle_api_state(self, request):
s = self._state
return web.json_response({
"printer_name": s["printer_name"],
"firmware_version": s["firmware_version"],
"print_state": s["print_state"],
"kobra_state": s["kobra_state"],
"nozzle_temp": s["nozzle_temp"],
"nozzle_target": s["nozzle_target"],
"bed_temp": s["bed_temp"],
"bed_target": s["bed_target"],
"progress": s["progress"],
"print_duration": s["print_duration"],
"remain_time": s["remain_time"],
"curr_layer": s["curr_layer"],
"total_layers": s["total_layers"],
"filename": s["filename"],
"slicer_time": s["slicer_time"],
"camera_url": s["camera_url"],
"fan_speed": s["fan_speed"],
"print_speed_mode": s["print_speed_mode"],
"light_on": s["light_on"],
"light_brightness": s["light_brightness"],
"ams_slots": self._ams_slots,
"ams_loaded_slot": self._ams_loaded_slot,
"thumbnail": self._thumbnail_b64,
"connection_error": s["connection_error"],
"file_ready": s["file_ready"],
"version": self._read_version(),
})
async def handle_moonraker_database(self, request):
"""OrcaSlicer Filament-Sync: /server/database/item?namespace=lane_data&key=lanes (AFC-Format)"""
namespace = request.rel_url.query.get("namespace", "")
key = request.rel_url.query.get("key", "")
if namespace == "lane_data":
await asyncio.get_event_loop().run_in_executor(None, self._get_ams_slots_fresh)
lanes = self._build_lane_data()
log.info(f"AMS-Sync: {len(lanes)} Lanes an OrcaSlicer")
return web.json_response({
"result": {
"namespace": "lane_data",
"key": key or "lanes",
"value": lanes,
}
})
if namespace in ("AFC", "afc-install", "happy_hare"):
return web.json_response({
"result": {"namespace": namespace, "key": key, "value": None}
})
return web.json_response(
{"error": {"code": 404, "message": f"Namespace '{namespace}' not found"}},
status=404
)
async def handle_database_list(self, request):
"""OrcaSlicer prüft welche Namespaces vorhanden sind um MMU-Typ zu erkennen."""
return web.json_response({"result": {"namespaces": ["lane_data"]}})
def _get_ams_slots_fresh(self):
"""Frische Slot-Daten per getInfo holen, Fallback auf gecachte."""
resp = self.client.publish("multiColorBox", "getInfo", None, timeout=5)
if resp and resp.get("data"):
boxes = resp["data"].get("multi_color_box") or []
if boxes:
slots = boxes[0].get("slots") or []
if slots:
self._ams_slots = slots
return self._ams_slots
# ─── Settings ────────────────────────────────────────────────────────────
def _find_config_path(self) -> pathlib.Path:
"""Gibt den Pfad zur config.ini zurück."""
if hasattr(env_loader, "find_config_path"):
return env_loader.find_config_path()
# Fallback für alten env_loader
script_dir = pathlib.Path(_BASE)
for base in (script_dir, script_dir.parent):
p = base / "config" / "config.ini"
if p.is_file():
return p
return script_dir / "config" / "config.ini"
async def handle_api_settings_get(self, request):
return web.json_response({
"printer_name": self._state.get("printer_name", ""),
"printer_ip": self._args.printer_ip,
"mqtt_port": self._args.mqtt_port,
"username": self._args.username,
"password": self._args.password,
"mode_id": self._args.mode_id,
"device_id": self._args.device_id,
"default_ams_slot": getattr(self._args, "default_ams_slot", "auto"),
"auto_leveling": getattr(self._args, "auto_leveling", 1),
})
async def handle_api_settings_post(self, request):
import configparser
data = await request.json()
config_path = self._find_config_path()
config_path.parent.mkdir(parents=True, exist_ok=True)
# Bestehende config.ini lesen (Kommentare gehen verloren, aber Werte bleiben)
cfg = configparser.ConfigParser()
if config_path.is_file():
cfg.read(config_path, encoding="utf-8")
# Sections sicherstellen
for section in ("connection", "print", "bridge"):
if not cfg.has_section(section):
cfg.add_section(section)
printer_ip = str(data.get("printer_ip", self._args.printer_ip or "")).split(":")[0]
cfg.set("connection", "printer_ip", printer_ip)
cfg.set("connection", "mqtt_port", str(data.get("mqtt_port", self._args.mqtt_port or 9883)))
cfg.set("connection", "username", str(data.get("username", self._args.username or "")))
cfg.set("connection", "password", str(data.get("password", self._args.password or "")))
cfg.set("connection", "mode_id", str(data.get("mode_id", self._args.mode_id or "")))
cfg.set("connection", "device_id", str(data.get("device_id", self._args.device_id or "")))
cfg.set("print", "default_ams_slot", str(data.get("default_ams_slot", getattr(self._args, "default_ams_slot", "auto"))))
cfg.set("print", "auto_leveling", str(data.get("auto_leveling", getattr(self._args, "auto_leveling", 1))))
if not cfg.has_option("bridge", "poll_interval"):
cfg.set("bridge", "poll_interval", "3")
printer_name = str(data.get("printer_name", "")).strip()
if printer_name:
cfg.set("bridge", "printer_name", printer_name)
elif cfg.has_option("bridge", "printer_name"):
cfg.remove_option("bridge", "printer_name")
with open(config_path, "w", encoding="utf-8") as f:
f.write("# KX-Bridge Konfigurationsdatei\n\n")
cfg.write(f)
log.info(f"Settings gespeichert in {config_path}")
# Response senden, dann Neustart
response = web.json_response({"status": "restarting"})
asyncio.get_event_loop().call_later(0.3, self._restart_bridge)
return response
async def handle_kx_printer_add(self, request):
"""Fügt einen Drucker hinzu: holt Credentials via IP, schreibt [printer_N], Neustart."""
try:
body = await request.json()
except Exception:
return self._json_cors({"error": "invalid json"}, status=400)
ip = str(body.get("printer_ip", "")).strip().split(":")[0]
name = str(body.get("name", "")).strip()
if not ip:
return self._json_cors({"error": "printer_ip required"}, status=400)
try:
creds = await _kx_fetch_credentials(ip)
except Exception as e:
return self._json_cors({"error": f"Drucker nicht erreichbar oder Fehler: {e}"}, status=502)
import configparser
config_path = self._find_config_path()
cfg = configparser.ConfigParser()
if config_path.is_file():
cfg.read(config_path, encoding="utf-8")
# Vorhandene [printer_N]-Sektionen + belegte http_ports ermitteln
n = 1
existing_ports: set[int] = set()
while cfg.has_section(f"printer_{n}"):
p = cfg[f"printer_{n}"]
if p.get("http_port"):
try:
existing_ports.add(int(p["http_port"]))
except ValueError:
pass
n += 1
# Kein [printer_N], aber ein befüllter [connection]? → als printer_1 migrieren
# (leerer [connection] = kein bestehender Drucker → nicht migrieren, neuer wird printer_1)
if n == 1 and cfg.has_section("connection") and (cfg["connection"].get("printer_ip") or "").strip():
c = cfg["connection"]
cfg.add_section("printer_1")
cfg.set("printer_1", "name", self._state.get("printer_name") or "Kobra X")
for k in ("printer_ip", "mqtt_port", "username", "password", "mode_id", "device_id"):
if c.get(k):
cfg.set("printer_1", k, c.get(k))
cfg.set("printer_1", "http_port", "7125")
existing_ports.add(7125)
n = 2
# Neuen Drucker als [printer_n] anlegen, freien Port wählen
new_port = 7125 + (n - 1)
while new_port in existing_ports:
new_port += 1
sec = f"printer_{n}"
cfg.add_section(sec)
cfg.set(sec, "name", name or creds["model"])
cfg.set(sec, "printer_ip", creds["printer_ip"])
cfg.set(sec, "mqtt_port", "9883")
cfg.set(sec, "username", creds["username"])
cfg.set(sec, "password", creds["password"])
cfg.set(sec, "mode_id", creds["mode_id"])
cfg.set(sec, "device_id", creds["device_id"])
cfg.set(sec, "http_port", str(new_port))
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
f.write("# KX-Bridge Konfigurationsdatei\n\n")
cfg.write(f)
log.info(f"Drucker '{name or creds['model']}' als {sec} hinzugefügt (Port {new_port})")
response = self._json_cors({"status": "restarting", "section": sec, "http_port": new_port})
asyncio.get_event_loop().call_later(0.5, self._restart_bridge)
return response
async def handle_kx_printer_remove(self, request):
"""Entfernt einen Drucker aus config.ini, dann Neustart.
- Multi-Modus: [printer_N] wird gelöscht, übrige umnummeriert (printer_3 → printer_2),
printer_1 bekommt immer http_port 7125.
- Einzel-Modus (kein [printer_N], nur [connection]): pid "1" leert den [connection]-Block
→ Bridge startet im Offline-Modus auf 7125, UI bleibt erreichbar.
- Wird der letzte [printer_N] entfernt: alle weg → ebenfalls "leerer" Zustand.
"""
pid = str(request.match_info.get("pid", "")).strip()
if not pid:
return self._json_cors({"error": "printer id required"}, status=400)
import configparser
config_path = self._find_config_path()
cfg = configparser.ConfigParser()
if config_path.is_file():
cfg.read(config_path, encoding="utf-8")
has_printer_sections = cfg.has_section("printer_1")
target = f"printer_{pid}"
if has_printer_sections:
if not cfg.has_section(target):
return self._json_cors({"error": f"{target} nicht gefunden"}, status=404)
# Alle [printer_N] einsammeln (außer der zu löschenden), neu nummerieren
kept = []
n = 1
while cfg.has_section(f"printer_{n}"):
if str(n) != pid:
kept.append(dict(cfg[f"printer_{n}"]))
cfg.remove_section(f"printer_{n}")
n += 1
for i, sec_data in enumerate(kept, start=1):
sec = f"printer_{i}"
cfg.add_section(sec)
for k, v in sec_data.items():
cfg.set(sec, k, v)
cfg.set(sec, "http_port", str(7125 + i - 1))
remaining = len(kept)
# War das der letzte Drucker? Dann auch [connection] leeren → wirklich "kein Drucker"
if remaining == 0 and cfg.has_section("connection"):
for k in ("printer_ip", "username", "password", "device_id"):
cfg.set("connection", k, "")
else:
# Einzel-Modus: nur pid "1" ist gültig (Pseudo-Eintrag aus handle_kx_printers)
if pid != "1":
return self._json_cors({"error": "kein Drucker mit dieser ID"}, status=404)
# [connection]-Werte leeren → Bridge startet ohne Drucker
if cfg.has_section("connection"):
for k in ("printer_ip", "username", "password", "device_id"):
cfg.set("connection", k, "")
remaining = 0
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
f.write("# KX-Bridge Konfigurationsdatei\n\n")
cfg.write(f)
log.info(f"Drucker {target} entfernt ({remaining} verbleibend)")
response = self._json_cors({"status": "restarting", "removed": target, "remaining": remaining})
asyncio.get_event_loop().call_later(0.5, self._restart_bridge)
return response
def _restart_bridge(self):
log.info("Bridge wird neu gestartet …")
# config_loader cached config.ini-Werte in os.environ ("nur wenn nicht gesetzt").
# Bei einem Restart muss environ bereinigt werden, sonst liest der neue Prozess
# die alten Werte statt der geänderten config.ini.
for _k in ("PRINTER_IP", "MQTT_PORT", "MQTT_USERNAME", "MQTT_PASSWORD",
"MODE_ID", "DEVICE_ID", "DEFAULT_AMS_SLOT", "AUTO_LEVELING",
"BRIDGE_PRINTER_NAME"):
os.environ.pop(_k, None)
in_docker = os.path.exists("/.dockerenv") or os.environ.get("KX_IN_DOCKER")
if in_docker:
# Docker/systemd: Prozess beenden reicht der Supervisor startet neu (frische environ)
log.info("Container-Umgebung erkannt beende Prozess für Supervisor-Restart")
os._exit(0)
frozen = getattr(sys, "frozen", False)
# Linux: os.execv ersetzt das Prozess-Image direkt sauber auch bei PyInstaller-Onefile
# (subprocess+exit würde dort am gelöschten _MEIxxxx-Temp-Verzeichnis scheitern).
if sys.platform != "win32":
exe = sys.executable
try:
if frozen:
os.execv(exe, [exe] + sys.argv[1:])
else:
os.execv(exe, [exe] + sys.argv)
except Exception as e:
log.error(f"Restart (execv) fehlgeschlagen: {e} bitte Bridge manuell neu starten")
os._exit(1)
# Windows: os.execv ist dort kaputt (neue PID, alter Prozess kehrt zurück) → subprocess
cmd = ([sys.executable] + sys.argv[1:]) if frozen else ([sys.executable] + sys.argv)
try:
subprocess.Popen(cmd, cwd=os.getcwd(),
creationflags=(subprocess.DETACHED_PROCESS
| subprocess.CREATE_NEW_PROCESS_GROUP))
except Exception as e:
log.error(f"Restart fehlgeschlagen: {e} bitte Bridge manuell neu starten")
os._exit(0)
# ─── Update ──────────────────────────────────────────────────────────────
STABLE_RELEASE_API = "https://gitea.it-drui.de/api/v1/repos/viewit/KX-Bridge-Release/releases?limit=1"
DEV_RELEASE_API = "https://gitea.it-drui.de/api/v1/repos/viewit/KX-Bridge-Release/releases?limit=10&pre-release=true"
GITEA_RAW_BASE = "https://gitea.it-drui.de/viewit/KX-Bridge-Release/raw/tag"
def _read_version(self) -> str:
for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent):
p = base / "VERSION"
if p.is_file():
return p.read_text(encoding="utf-8").strip()
return "unknown"
def _write_version(self, version: str):
for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent):
p = base / "VERSION"
if p.is_file():
p.write_text(version + "\n", encoding="utf-8")
return
(pathlib.Path(_BASE) / "VERSION").write_text(version + "\n", encoding="utf-8")
@staticmethod
def _parse_version(v: str) -> "tuple[int, ...]":
"""'v0.9.1-beta1' → (0, 9, 1) nur numerische Teile vor dem ersten '-'"""
v = v.lstrip("v").split("-")[0]
parts = re.split(r"[.\s]+", v)
result = []
for p in parts:
try:
result.append(int(p))
except ValueError:
break
return tuple(result) or (0,)
async def handle_api_log_stream(self, request):
"""SSE-Endpoint: sendet Log-Einträge live an den Browser."""
resp = web.StreamResponse(headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
})
await resp.prepare(request)
# Zuerst Ring-Buffer senden
for entry in list(_log_buffer):
data = json.dumps(entry, ensure_ascii=False)
await resp.write(f"data: {data}\n\n".encode())
# Dann live streamen
q: asyncio.Queue = asyncio.Queue()
_log_sse_queues.append(q)
try:
while True:
entry = await asyncio.wait_for(q.get(), timeout=25)
data = json.dumps(entry, ensure_ascii=False)
await resp.write(f"data: {data}\n\n".encode())
except asyncio.TimeoutError:
await resp.write(b": keepalive\n\n")
except (ConnectionResetError, Exception):
pass
finally:
_log_sse_queues.remove(q) if q in _log_sse_queues else None
return resp
async def handle_api_log_download(self, request):
"""Gibt alle gepufferten Log-Einträge als Plaintext zum Download."""
lines = [f"[{e['ts']}] {e['lvl']:<5} {e['name']}: {e['msg']}" for e in _log_buffer]
text = "\n".join(lines)
return web.Response(
body=text.encode("utf-8"),
content_type="text/plain",
headers={"Content-Disposition": 'attachment; filename="kx-bridge.log"'},
)
async def handle_api_update_check(self, request):
current = self._read_version()
is_dev = "-dev+" in current
api_url = self.DEV_RELEASE_API if is_dev else self.STABLE_RELEASE_API
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
return web.json_response({"error": f"Gitea HTTP {resp.status}"}, status=502)
releases = await resp.json(content_type=None)
if not releases:
return web.json_response({"error": "Keine Releases gefunden"}, status=404)
# Dev: neuestes Release mit "-dev+" im Tag suchen
if is_dev:
dev_releases = [r for r in releases if "-dev+" in r.get("tag_name", "")]
if not dev_releases:
return web.json_response({"error": "Keine Dev-Releases gefunden"}, status=404)
data = dev_releases[0]
else:
data = releases[0]
tag = data.get("tag_name", "")
latest = tag.lstrip("v")
if is_dev:
update_available = tag != f"v{current}"
else:
update_available = self._parse_version(tag) > self._parse_version(current)
download_url = f"{self.GITEA_RAW_BASE}/{tag}/kobrax_moonraker_bridge.py"
return web.json_response({
"current": current,
"latest": latest,
"update_available": update_available,
"tag": tag,
"download_url": download_url,
"changelog": data.get("body", ""),
})
except Exception as e:
return web.json_response({"error": str(e)}, status=502)
async def handle_api_update_apply(self, request):
data = await request.json()
download_url = data.get("download_url", "")
new_tag = data.get("tag", "")
if not download_url:
return web.json_response({"error": "download_url fehlt"}, status=400)
script_path = pathlib.Path(sys.executable if getattr(sys, "frozen", False) else __file__).resolve()
try:
async with aiohttp.ClientSession() as session:
async with session.get(download_url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return web.json_response({"error": f"Download HTTP {resp.status}"}, status=502)
content = await resp.read()
# Atomisch ersetzen
tmp = script_path.with_suffix(".py.new")
tmp.write_bytes(content)
os.replace(tmp, script_path)
if new_tag:
self._write_version(new_tag.lstrip("v"))
log.info(f"Update auf {new_tag} installiert, starte neu …")
except Exception as e:
return web.json_response({"error": str(e)}, status=502)
response = web.json_response({"status": "updating"})
asyncio.get_event_loop().call_later(0.3, self._restart_bridge)
return response
async def handle_catchall(self, request):
body = await request.read()
log.warning(f"UNBEKANNT {request.method} {request.path_qs} body={body[:200]}")
return web.json_response({"result": {}}, status=200)
async def handle_favicon(self, request):
# Minimales 1x1 ICO damit der Browser nicht 404 loggt
ico = bytes([
0,0,1,0,1,0,1,1,0,0,1,0,24,0,40,0,0,0,22,0,0,0,40,0,0,0,
1,0,0,0,2,0,0,0,1,0,24,0,0,0,0,0,4,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,255,102,0,0,0,0,0,0
])
return web.Response(body=ico, content_type="image/x-icon")
# -------------------------------------------------------------------------
# WebSocket handler
# -------------------------------------------------------------------------
async def handle_websocket(self, request):
ws = web.WebSocketResponse(heartbeat=30)
await ws.prepare(request)
ws._loop = asyncio.get_event_loop()
self.ws_clients.add(ws)
log.info(f"WS client verbunden ({len(self.ws_clients)} gesamt)")
# Send klippy_ready notification
await ws.send_str(json.dumps({
"jsonrpc": "2.0",
"method": "notify_klippy_ready",
"params": [],
}))
# Send initial status
await ws.send_str(json.dumps({
"jsonrpc": "2.0",
"method": "notify_status_update",
"params": [self._build_printer_objects(), time.time()],
}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_ws_rpc(ws, msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE):
break
self.ws_clients.discard(ws)
log.info(f"WS client getrennt ({len(self.ws_clients)} verbleibend)")
return ws
async def _handle_ws_rpc(self, ws: web.WebSocketResponse, raw: str):
try:
req = json.loads(raw)
except Exception:
return
rpc_id = req.get("id")
method = req.get("method", "")
log.info(f"WS RPC: {method} params={str(req.get('params',''))[:120]}")
params = req.get("params") or {}
if isinstance(params, list):
params = params[0] if params else {}
result = None
error = None
try:
if method in ("printer.info", "printer_info"):
result = {
"state": "ready",
"state_message": "Printer is ready",
"hostname": "kobrax-bridge",
"software_version": KLIPPER_VERSION,
"cpu_info": self._state["printer_name"],
"klipper_path": "/home/pi/klipper",
"python_path": "/home/pi/klippy-env/bin/python",
}
elif method in ("server.info", "server_info"):
result = {
"klippy_connected": True,
"klippy_state": "ready",
"moonraker_version": MOONRAKER_VERSION,
"components": [],
"failed_components": [],
"registered_directories": ["gcodes"],
"warnings": [],
}
elif method in ("printer.objects.list",):
result = {"objects": list(self._build_printer_objects().keys())}
elif method in ("printer.objects.query", "printer.objects.get"):
objects = params.get("objects", {})
all_objs = self._build_printer_objects()
if objects:
filtered = {k: all_objs.get(k, {}) for k in objects}
else:
filtered = all_objs
result = {"status": filtered, "eventtime": time.time()}
elif method == "printer.objects.subscribe":
objects = params.get("objects", {})
all_objs = self._build_printer_objects()
if objects:
filtered = {k: all_objs.get(k, {}) for k in objects}
else:
filtered = all_objs
result = {"status": filtered, "eventtime": time.time()}
elif method == "printer.print.start":
filename = params.get("filename", self._last_uploaded_file)
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start",
{"filename": filename, "use_ams": False}, timeout=15.0)
)
result = "ok" if resp else "timeout"
elif method == "printer.print.pause":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.pause_print)
result = "ok"
elif method == "printer.print.resume":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.resume_print)
result = "ok"
elif method == "printer.print.cancel":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.stop_print)
result = "ok"
elif method == "machine.system_info":
result = {"system_info": {"cpu_info": {"cpu_desc": "Kobra X Bridge"}}}
elif method == "server.files.list":
result = []
else:
log.debug(f"Unbekannte RPC-Methode: {method}")
result = {}
except Exception as e:
log.error(f"RPC-Fehler für {method}: {e}")
error = {"code": -32603, "message": str(e)}
if rpc_id is not None:
response = {"jsonrpc": "2.0", "id": rpc_id}
if error:
response["error"] = error
else:
response["result"] = result
await ws.send_str(json.dumps(response))
# -------------------------------------------------------------------------
# Poll loop (sync, runs in executor)
# -------------------------------------------------------------------------
def _printer_reachable(self) -> bool:
"""TCP-Probe auf den MQTT-Port kein ICMP nötig, kein root erforderlich."""
import socket as _socket
try:
with _socket.create_connection(
(self._args.printer_ip, self._args.mqtt_port), timeout=2.0
):
return True
except OSError:
return False
def _poll_loop(self, stop_event: threading.Event):
_offline = self._state["kobra_state"] == "offline"
_probe_interval = 10.0 # Sekunden zwischen TCP-Probes im Offline-Modus
while not stop_event.is_set():
# ── Offline-Modus: warten bis Drucker wieder erreichbar ──────────
if _offline:
if self._printer_reachable():
log.info("Drucker erreichbar stelle MQTT-Verbindung her …")
try:
self.client.connect()
_offline = False
self._state["print_state"] = "standby"
self._state["kobra_state"] = "free"
self._state["connection_error"] = ""
log.info("MQTT-Verbindung wiederhergestellt")
except Exception as e:
err = _mqtt_error_msg(e)
self._state["connection_error"] = err
log.warning(f"Verbindungsaufbau fehlgeschlagen: {err}")
stop_event.wait(_probe_interval)
continue
else:
stop_event.wait(_probe_interval)
continue
# ── Online-Modus: normaler Poll ──────────────────────────────────
try:
info = self.client.query_info()
if info:
self._on_info(info)
# Während Druck: print/report direkt abfragen
if self._state["print_state"] in ("printing", "preheating",
"auto_leveling", "checking", "init"):
print_r = self.client.publish("print", "query", timeout=3.0)
if print_r:
self._on_print(print_r)
box = self.client.query_multicolor_box()
if box:
boxes = (box.get("data") or {}).get("multi_color_box") or []
slots = boxes[0].get("slots") or [] if boxes else []
if slots:
self._ams_slots = slots
except Exception as e:
log.warning(f"Poll-Fehler: {e}")
# Prüfen ob Drucker wirklich weg ist
if not self._printer_reachable():
log.info("Drucker nicht erreichbar wechsle in Offline-Modus")
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
self._state["connection_error"] = f"Printer unreachable ({self._args.printer_ip})"
try:
self.client.disconnect()
except Exception:
pass
_offline = True
stop_event.wait(3.0)
# ---------------------------------------------------------------------------
# App factory + main
# ---------------------------------------------------------------------------
def _mqtt_error_msg(exc: Exception) -> str:
msg = str(exc)
if "20020005" in msg:
return "Wrong MQTT credentials (username, password or device ID incorrect)"
return msg
@web.middleware
async def cors_middleware(request, handler):
if request.method == "OPTIONS":
return web.Response(status=204, headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
})
resp = await handler(request)
resp.headers["Access-Control-Allow-Origin"] = "*"
return resp
def build_app(bridge: KobraXBridge) -> web.Application:
app = web.Application(
client_max_size=256 * 1024 * 1024,
middlewares=[cors_middleware],
)
r = app.router
# Moonraker API
r.add_get("/server/info", bridge.handle_server_info)
r.add_get("/printer/info", bridge.handle_printer_info)
r.add_get("/machine/system_info", bridge.handle_machine_system_info)
r.add_get("/printer/objects/list", bridge.handle_objects_list)
r.add_get("/printer/objects/query", bridge.handle_objects_query)
r.add_get("/printer/objects/subscribe", bridge.handle_objects_subscribe)
r.add_post("/printer/objects/subscribe", bridge.handle_objects_subscribe)
r.add_get("/server/files/list", bridge.handle_files_list)
r.add_post("/server/files/upload", bridge.handle_file_upload)
r.add_post("/printer/print/start", bridge.handle_print_start)
r.add_post("/printer/print/pause", bridge.handle_print_pause)
r.add_post("/printer/print/resume", bridge.handle_print_resume)
r.add_post("/printer/print/cancel", bridge.handle_print_cancel)
# OctoPrint compatibility (OrcaSlicer probes this + uploads here)
r.add_get("/api/version", bridge.handle_octoprint_version)
r.add_post("/api/files/local", bridge.handle_file_upload)
r.add_post("/api/files/{path:.*}", bridge.handle_file_upload)
# Moonraker database (OrcaSlicer AMS-Sync)
r.add_get("/server/database/item", bridge.handle_moonraker_database)
r.add_get("/server/database/list", bridge.handle_database_list)
# New API endpoints
r.add_post("/api/light", bridge.handle_api_light)
r.add_post("/api/fan", bridge.handle_api_fan)
r.add_post("/api/connect", bridge.handle_api_connect)
r.add_post("/api/disconnect", bridge.handle_api_disconnect)
r.add_post("/api/speed", bridge.handle_api_speed)
r.add_post("/api/ams/feed", bridge.handle_api_ams_feed)
r.add_post("/api/ams/set_slot", bridge.handle_api_ams_set_slot)
r.add_post("/api/axis", bridge.handle_api_axis)
r.add_post("/api/temperature", bridge.handle_api_temperature)
r.add_get("/api/camera", bridge.handle_api_camera)
r.add_get("/api/camera/stream", bridge.handle_camera_stream)
r.add_get("/api/camera/snapshot", bridge.handle_api_camera_snapshot)
r.add_post("/api/camera/start", bridge.handle_api_camera_start)
r.add_post("/api/camera/stop", bridge.handle_api_camera_stop)
r.add_get("/api/state", bridge.handle_api_state)
r.add_get("/api/settings", bridge.handle_api_settings_get)
r.add_post("/api/settings", bridge.handle_api_settings_post)
r.add_get("/api/update/check", bridge.handle_api_update_check)
r.add_post("/api/update/apply", bridge.handle_api_update_apply)
r.add_post("/api/file_ready/clear", bridge.handle_api_file_ready_clear)
r.add_get("/api/log/stream", bridge.handle_api_log_stream)
r.add_get("/api/log/download", bridge.handle_api_log_download)
r.add_get("/serve/{filename}", bridge.handle_serve_file)
# /kx/ GCode Store + History + Filament
r.add_get("/kx/printers", bridge.handle_kx_printers)
r.add_post("/kx/printers/add", bridge.handle_kx_printer_add)
r.add_delete("/kx/printers/{pid}", bridge.handle_kx_printer_remove)
r.add_post("/kx/print", bridge.handle_kx_print)
r.add_get("/kx/files", bridge.handle_kx_files)
r.add_delete("/kx/files/{file_id}", bridge.handle_kx_file_delete)
r.add_get("/kx/filament/slots", bridge.handle_kx_filament_slots)
r.add_get("/kx/history", bridge.handle_kx_history)
r.add_get("/kx/ui/{name}", bridge.handle_kx_ui_asset)
r.add_get("/kx/files/{id}/objects", bridge.handle_kx_file_objects)
r.add_post("/kx/skip", bridge.handle_kx_skip)
r.add_post("/kx/skip/query", bridge.handle_kx_skip_query)
r.add_get("/kx/skip/state", bridge.handle_kx_skip_state)
r.add_route("OPTIONS", "/kx/{path:.*}", bridge.handle_kx_options)
# Root + Printer-Routen (Single-Page, JS liest Pathname)
r.add_get("/", bridge.handle_index)
r.add_get("/printer{num:\d+}", bridge.handle_index)
r.add_get("/favicon.ico", bridge.handle_favicon)
# WebSocket
r.add_get("/websocket", bridge.handle_websocket)
# Catch-all: alle unbekannten Requests loggen statt 404
r.add_route("*", "/{path:.*}", bridge.handle_catchall)
return app
def _build_per_printer_args(base_args, p: dict):
"""Kopiere CLI-Args, überschreibe mit Druckereintrag aus config.ini."""
import copy
a = copy.copy(base_args)
a.printer_ip = p.get("printer_ip") or base_args.printer_ip
a.mqtt_port = int(p.get("mqtt_port") or base_args.mqtt_port)
a.username = p.get("username") or base_args.username
a.password = p.get("password") or base_args.password
a.mode_id = p.get("mode_id") or base_args.mode_id
a.device_id = p.get("device_id") or base_args.device_id
a.port = int(p.get("http_port") or base_args.port)
return a
async def run_bridge(args):
printers = env_loader.list_printers()
multi_mode = bool(printers)
if not printers:
printers = [{
"id": "1",
"name": getattr(args, "printer_name", None) or "Anycubic Kobra X",
"printer_ip": args.printer_ip,
"mqtt_port": args.mqtt_port,
"username": args.username,
"password": args.password,
"mode_id": args.mode_id,
"device_id": args.device_id,
"http_port": args.port,
}]
store = GCodeStore(args.data_dir)
all_bridges: dict = {}
runners = []
stop_event = threading.Event()
loop = asyncio.get_event_loop()
for idx, p in enumerate(printers):
pid = str(p.get("id") or (idx + 1))
per_args = _build_per_printer_args(args, p)
# Default-Port-Konvention: 7125 + (id-1) wenn kein http_port gesetzt
if not p.get("http_port") and multi_mode:
try:
per_args.port = 7125 + (int(pid) - 1)
except ValueError:
per_args.port = 7125 + idx
client = KobraXClient(
host=per_args.printer_ip,
port=per_args.mqtt_port,
username=per_args.username,
password=per_args.password,
mode_id=per_args.mode_id,
device_id=per_args.device_id,
client_id=f"kobrax_bridge_{pid}",
)
bridge = KobraXBridge(
client, args=per_args, store=store,
printer_id=pid, all_bridges=all_bridges,
)
# printer_name aus config.ini übernehmen falls gesetzt
if p.get("name"):
bridge._state["printer_name"] = p["name"]
bridge._name_locked = True
all_bridges[pid] = bridge
log.info(f"[Drucker {pid}] Verbinde mit {per_args.printer_ip}:{per_args.mqtt_port}")
try:
await loop.run_in_executor(None, client.connect)
log.info(f"[Drucker {pid}] MQTT verbunden")
except Exception as e:
err = _mqtt_error_msg(e)
log.warning(f"[Drucker {pid}] Verbindung fehlgeschlagen: {err} Offline-Modus")
bridge._state["print_state"] = "error"
bridge._state["kobra_state"] = "offline"
bridge._state["connection_error"] = err
threading.Thread(
target=bridge._poll_loop, args=(stop_event,),
daemon=True, name=f"poll-{pid}",
).start()
app = build_app(bridge)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, args.host, per_args.port)
await site.start()
runners.append((runner, client, pid))
log.info(f"[Drucker {pid}] Bridge läuft auf http://{args.host}:{per_args.port}")
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s:
_s.connect(("8.8.8.8", 80))
_local_ip = _s.getsockname()[0]
except Exception:
_local_ip = args.host
log.info(f"OrcaSlicer → Klipper → Host: {_local_ip} Ports: " +
", ".join(str(getattr(b._args, 'port', 0)) for b in all_bridges.values()))
log.info("Ctrl-C zum Beenden")
try:
while True:
await asyncio.sleep(3600)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
stop_event.set()
for runner, client, pid in runners:
try:
await runner.cleanup()
except Exception:
pass
try:
client.disconnect()
except Exception:
pass
log.info("Bridge beendet")
def _default_data_dir() -> str:
"""Persistenz-Verzeichnis: Docker setzt KX_DATA_DIR, Binary nutzt <exe-dir>/data,
Dev-Script nutzt <repo>/data (oder /app/data falls vorhanden)."""
if os.environ.get("KX_DATA_DIR"):
return os.environ["KX_DATA_DIR"]
if getattr(sys, "frozen", False):
return os.path.join(os.path.dirname(sys.executable), "data")
if os.path.isdir("/app"):
return "/app/data"
return os.path.normpath(os.path.join(_BASE, "..", "data"))
def main():
parser = argparse.ArgumentParser(description="Moonraker-Bridge für Anycubic Kobra X")
parser.add_argument("--printer-ip", default=env_loader.PRINTER_IP,
help="IP-Adresse des Druckers")
parser.add_argument("--mqtt-port", type=int, default=env_loader.MQTT_PORT)
parser.add_argument("--username", default=env_loader.USERNAME)
parser.add_argument("--password", default=env_loader.PASSWORD)
parser.add_argument("--mode-id", default=env_loader.MODE_ID)
parser.add_argument("--device-id", default=env_loader.DEVICE_ID)
parser.add_argument("--default-ams-slot",default=env_loader.DEFAULT_AMS_SLOT)
parser.add_argument("--auto-leveling", type=int, default=env_loader.AUTO_LEVELING)
parser.add_argument("--host", default="0.0.0.0",
help="Bind-Adresse für den Bridge-Server")
parser.add_argument("--port", type=int, default=7125,
help="HTTP/WS-Port (Moonraker-Standard: 7125)")
parser.add_argument("--data-dir", default=_default_data_dir(),
help="Persistenz-Verzeichnis für GCode-Store und DB")
parser.add_argument(
"--ui-theme",
default=os.environ.get("KX_UI_THEME", "default"),
metavar="NAME",
help="Web-UI-Theme (Ordner web/themes/NAME/, Standard: default). "
"Alternativ: Umgebungsvariable KX_UI_THEME.",
)
args = parser.parse_args()
if args.printer_ip and ":" in args.printer_ip:
args.printer_ip = args.printer_ip.split(":")[0]
# Windows braucht ProactorEventLoop für asyncio.create_subprocess_exec
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(run_bridge(args))
if __name__ == "__main__":
main()