Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/cli/client_sio.py: 0%
34 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4"""Example client of the REST server using socketio
5"""
7import socketio
8import logging
10logging.basicConfig(level=logging.DEBUG)
11logger = logging.getLogger(__name__)
14class BaseNameSpace(socketio.ClientNamespace):
15 def __init__(self, namespace, *args, **kwargs):
16 super().__init__(namespace)
17 self.__dict__.update(("_" + k, v) for k, v in kwargs.items())
19 def on_change(self, data):
20 print("{cls}.on_change".format(cls=self.__class__.__name__), data)
23class SocketIOClient:
24 base = "http://localhost"
25 port = 8080
26 _token = None
28 def __init__(self, token=None, base=None, port=None):
29 if base is not None:
30 self.base = base
32 if port is not None:
33 self.port = port
35 if token:
36 self._token = token
38 self._socketio = socketio.Client()
40 def connect(self):
41 self._socketio.connect(
42 "{ip}:{port}?token={token}".format(
43 ip=self.base, port=self.port, token=self._token
44 )
45 )
47 def register_namespace(self, namespace):
48 self._socketio.register_namespace(namespace)
50 def run(self):
51 self._socketio.wait()
54if __name__ == "__main__":
55 client = SocketIOClient()
56 client.register_namespace(BaseNameSpace("/hardware"))
57 client.register_namespace(BaseNameSpace("/scans"))
58 client.connect()
59 client.run()