Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/console.py: 0%

45 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import gevent 

4import time 

5 

6import pexpect 

7 

8from daiquiri.core.components import Component 

9 

10 

11import logging 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class SocketFileObject: 

17 """Mock file object that writes to socket.io""" 

18 

19 def __init__(self, emit): 

20 self.emit = emit 

21 

22 def open(self, *args, **kwargs): 

23 pass 

24 

25 def flush(self, *args, **kwargs): 

26 pass 

27 

28 def write(self, line): 

29 self.emit("output", {"output": line}) 

30 

31 

32class Console(Component): 

33 """Run a shell command and interact with it 

34 

35 Background: 

36 https://stackoverflow.com/questions/48359324/controlling-less-with-popen 

37 https://github.com/cs01/pyxterm.js 

38 https://pexpect.readthedocs.io/en/stable/index.html 

39 """ 

40 

41 _namespace = "pty" 

42 

43 def setup(self): 

44 self.on("input")(self.input) 

45 self.on("resize")(self.set_winsize) 

46 self.start_console() 

47 

48 def check_user(self): 

49 self._metadata.get_user() 

50 

51 def has_control(self): 

52 sessionid = self._session._session_from_sio("pty") 

53 return self._session._storage.has_control(sessionid) 

54 

55 def set_winsize(self, payload): 

56 self._console.setwinsize(payload["rows"], payload["cols"]) 

57 

58 def start_console(self): 

59 self._console = pexpect.spawn( 

60 "/bin/bash", ["-c", self._config["command"]], encoding="utf-8" 

61 ) 

62 self._console.logfile_read = SocketFileObject(self.emit) 

63 gevent.spawn(self.read_console) 

64 

65 def input(self, payload): 

66 if self.has_control(): 

67 self._console.send(payload["input"]) 

68 

69 else: 

70 sessionid = self._session._session_from_sio("pty") 

71 logger.warning( 

72 f"Session {sessionid} sending console input but does not have control" 

73 ) 

74 

75 return True 

76 

77 def read_console(self): 

78 while True: 

79 try: 

80 self._console.read_nonblocking(size=1024, timeout=0.1) 

81 except pexpect.exceptions.TIMEOUT: 

82 pass 

83 

84 time.sleep(0.02)