Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/cli/client_sio.py: 0%

34 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4"""Example client of the REST server using socketio 

5""" 

6 

7import socketio 

8import logging 

9 

10logging.basicConfig(level=logging.DEBUG) 

11logger = logging.getLogger(__name__) 

12 

13 

14class BaseNameSpace(socketio.ClientNamespace): 

15 def __init__(self, namespace, *args, **kwargs): 

16 super().__init__(namespace) 

17 self.__dict__.update(("_" + k, v) for k, v in kwargs.items()) 

18 

19 def on_change(self, data): 

20 print("{cls}.on_change".format(cls=self.__class__.__name__), data) 

21 

22 

23class SocketIOClient: 

24 base = "http://localhost" 

25 port = 8080 

26 _token = None 

27 

28 def __init__(self, token=None, base=None, port=None): 

29 if base is not None: 

30 self.base = base 

31 

32 if port is not None: 

33 self.port = port 

34 

35 if token: 

36 self._token = token 

37 

38 self._socketio = socketio.Client() 

39 

40 def connect(self): 

41 self._socketio.connect( 

42 "{ip}:{port}?token={token}".format( 

43 ip=self.base, port=self.port, token=self._token 

44 ) 

45 ) 

46 

47 def register_namespace(self, namespace): 

48 self._socketio.register_namespace(namespace) 

49 

50 def run(self): 

51 self._socketio.wait() 

52 

53 

54if __name__ == "__main__": 

55 client = SocketIOClient() 

56 client.register_namespace(BaseNameSpace("/hardware")) 

57 client.register_namespace(BaseNameSpace("/scans")) 

58 client.connect() 

59 client.run()