import time from typing import Any import socketio import socketio.exceptions from . import storage class DockgeConnection: class LoginException(BaseException): pass _sio: socketio.Client _host: str _logged_in: bool _stacklist: Any def __init__(self): self._logged_in = False self._host = storage.get("host") if self._host is None: raise ValueError("Host for Dockge is not defined!") self._sio = socketio.Client(logger=False, engineio_logger=False) self._stacklist = None self._init_events() def _init_events(self): @self._sio.event def connect(): self.connect() print("Connected!") @self._sio.event def disconnect(): self._logged_in = False print("Disconnected!") @self._sio.event def connect_error(data): print("The connection failed!") print(data) @self._sio.on('info') def info(data): if all(k in data for k in ("version", "latestVersion", "isContainer", "primaryHostname")): print(f"Dockge Version: {data['version']}") @self._sio.on('agent') def agent(*args): if args[0] == "stackList": self._stacklist = args[1] # Callbacks def _login_process(self, data): success = False if "ok" in data and "token" in data: print(f"Login was {'successful' if data['ok'] else 'unsuccessful'}!") success = True else: print("Issue with login procedure") return success # Functions def connect_and_login(self): self._sio.connect(f"https://{self._host}/socket.io/", transports=['websocket']) self.connect() def connect(self): if self._logged_in: return data = None retry, count = True, 0 while retry and count < 5: try: data = self._sio.call( "login", { "username": storage.get("username", encoded=True), "password": storage.get("password", encoded=True), "token":"" }, timeout=5 ) retry = False except socketio.exceptions.TimeoutError: retry = True count += 1 if retry or not self._login_process(data): raise self.LoginException self._logged_in = True def list_stacks(self): self._sio.emit("agent", ("", "requestStackList")) while self._stacklist is None: time.sleep(0.5) retval = self._stacklist self._stacklist = None return retval def list_stack(self, name: str): ret = self._sio.call("agent", ("", "serviceStatusList", name), timeout=5) return ret def restart(self, name): ret = self._sio.call("agent", ("", "restartStack", name), timeout=10) return ret def update(self, name): ret = self._sio.call("agent", ("", "updateStack", name), timeout=10) return ret def stop(self, name): ret = self._sio.call("agent", ("", "stopStack", name), timeout=10) return ret def start(self, name): ret = self._sio.call("agent", ("", "startStack", name), timeout=10) return ret def down(self, name): ret = self._sio.call("agent", ("", "downStack", name), timeout=10) return ret def disconnect(self): self._sio.emit("logout") self._sio.disconnect()