Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/console.py: 0%
45 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import gevent
4import time
6import pexpect
8from daiquiri.core.components import Component
11import logging
13logger = logging.getLogger(__name__)
16class SocketFileObject:
17 """Mock file object that writes to socket.io"""
19 def __init__(self, emit):
20 self.emit = emit
22 def open(self, *args, **kwargs):
23 pass
25 def flush(self, *args, **kwargs):
26 pass
28 def write(self, line):
29 self.emit("output", {"output": line})
32class Console(Component):
33 """Run a shell command and interact with it
35 Background:
36 https://stackoverflow.com/questions/48359324/controlling-less-with-popen
37 https://github.com/cs01/pyxterm.js
38 https://pexpect.readthedocs.io/en/stable/index.html
39 """
41 _namespace = "pty"
43 def setup(self):
44 self.on("input")(self.input)
45 self.on("resize")(self.set_winsize)
46 self.start_console()
48 def check_user(self):
49 self._metadata.get_user()
51 def has_control(self):
52 sessionid = self._session._session_from_sio("pty")
53 return self._session._storage.has_control(sessionid)
55 def set_winsize(self, payload):
56 self._console.setwinsize(payload["rows"], payload["cols"])
58 def start_console(self):
59 self._console = pexpect.spawn(
60 "/bin/bash", ["-c", self._config["command"]], encoding="utf-8"
61 )
62 self._console.logfile_read = SocketFileObject(self.emit)
63 gevent.spawn(self.read_console)
65 def input(self, payload):
66 if self.has_control():
67 self._console.send(payload["input"])
69 else:
70 sessionid = self._session._session_from_sio("pty")
71 logger.warning(
72 f"Session {sessionid} sending console input but does not have control"
73 )
75 return True
77 def read_console(self):
78 while True:
79 try:
80 self._console.read_nonblocking(size=1024, timeout=0.1)
81 except pexpect.exceptions.TIMEOUT:
82 pass
84 time.sleep(0.02)