Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/__init__.py: 75%
534 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4from functools import partial
5import gevent
6import time
7import uuid
8import json
9import sys
10from contextlib import contextmanager
11import traceback
13from abc import ABC, abstractmethod
14from marshmallow import Schema, fields, ValidationError
16from daiquiri.core.exceptions import InvalidYAML
17from daiquiri.core.logging import log
18from daiquiri.core import (
19 CoreBase,
20 CoreResource,
21 marshal,
22 require_valid_session,
23 require_staff,
24)
25from daiquiri.core.utils import loader
26from daiquiri.resources.utils import get_resource_provider, YamlDict
27from daiquiri.core.exceptions import SyntaxErrorYAML
28from daiquiri.core.schema import MessageSchema, ErrorSchema
29from daiquiri.core.schema.components import ComponentInfoSchema, DebugStateSchema
30from daiquiri.core.schema.metadata import paginated
31from daiquiri.core.components.params import ParamSchema
33import logging
35logger = logging.getLogger(__name__)
38def actor(name, **kwargs):
39 def decorator(fn):
40 fn.__actor_resource__ = {"name": name, "kwargs": kwargs}
41 return fn
43 return decorator
46def actor_wrapper(name, **actkw):
47 def decorator(fn):
48 def wrapper(self, *args, **kwargs):
49 kwargs["actor"] = name
50 kwargs["component"] = fn.__module__
52 if kwargs.get("save"):
53 kwargs.pop("save")
54 kwargs.pop("enqueue", None)
55 datacollectionplan = self._parent._metadata.add_datacollectionplan(
56 sampleid=kwargs.pop("sampleid"),
57 scanparameters={"actor": name, **kwargs},
58 )
59 log.get("user").info(
60 f"New data collection plan saved for actor '{name}' with id '{datacollectionplan['datacollectionplanid']}'",
61 type="actor",
62 )
63 return (
64 {
65 "datacollectionplanid": datacollectionplan[
66 "datacollectionplanid"
67 ]
68 },
69 200,
70 )
72 if "preprocess" in actkw:
73 # Allow preprocessors to throw errors
74 try:
75 preprocess = getattr(self, "preprocess")
76 kwargs = preprocess(*args, **kwargs)
77 except Exception as e:
78 logger.exception("Could not create actor")
79 log.get("user").error(
80 f"Could not create actor: `{str(e)}`", type="queue"
81 )
82 return {"error": f"Could not create actor: {str(e)}"}, 400
84 actkw2 = {
85 i: actkw[i] for i in actkw if i not in ["preprocess", "synchronous"]
86 }
88 if hasattr(self._parent, "actor_success"):
89 actkw2["success"] = self._parent.actor_success
91 if hasattr(self._parent, "actor_started"):
92 actkw2["start"] = self._parent.actor_started
94 if hasattr(self._parent, "actor_error"):
95 actkw2["error"] = self._parent.actor_error
97 if hasattr(self._parent, "actor_remove"):
98 actkw2["remove"] = self._parent.actor_remove
100 if "synchronous" in actkw:
101 (actor, greenlet) = self._parent.actor(
102 name, actargs=kwargs, return_actor=True, spawn=True, **actkw2
103 )
104 uuid = actor.uid
105 else:
106 uuid = self._parent.actor(name, actargs=kwargs, **actkw2)
107 if not uuid:
108 return {"error": "Could not load actor"}, 400
110 ret = {}
111 if not ("preprocess" in actkw):
112 ret = kwargs
114 if "synchronous" in actkw:
115 greenlet.join()
116 ret["result"] = actor.resp()
118 log.get("user").info(f"New actor created '{name}'", type="actor")
120 return dict({"uuid": uuid}, **ret), 200
122 # TODO: hmm, need to dig into this, @wraps clobbers kwargs?
123 if hasattr(fn, "_require_control"):
124 wrapper._require_control = fn._require_control
125 if hasattr(fn, "_require_staff"):
126 wrapper._require_staff = fn._require_staff
128 return wrapper
130 return decorator
133class AvailableComponentsResource(CoreResource):
134 @marshal(out=[[200, paginated(ComponentInfoSchema), "List of components"]])
135 def get(self):
136 """Get a list of loaded components"""
137 return self._parent.get_components(), 200
140class ConfigExportResource(CoreResource):
141 def get(self):
142 """Returns any config that is exported from each of the loaded components"""
143 return self._parent.get_export_config()
146class ComponentReloadResource(CoreResource):
147 @require_valid_session
148 @require_staff
149 @marshal(
150 out=[
151 [200, MessageSchema(), "Components reloaded"],
152 [400, ErrorSchema(), "Could not reload components"],
153 ]
154 )
155 def post(self):
156 """Reload all components with new config"""
157 if self._parent.reload():
158 return {"message": "Components reloaded"}
159 else:
160 return {"error": "Could not reload components"}, 400
163class ComponentDebugResource(CoreResource):
164 @marshal(out=[[200, DebugStateSchema(), "Debug state"]])
165 def get(self, **kwargs):
166 """Get the app debug state"""
167 return {"state": self._parent._app.debug}
169 @require_staff
170 @marshal(
171 inp={
172 "state": fields.Bool(
173 required=True,
174 metadata={"description": "The requested application debug state"},
175 )
176 },
177 out=[[200, DebugStateSchema(), "Debug state changed"]],
178 )
179 def post(self, **kwargs):
180 """Change the app debug state"""
181 self._parent._app.debug = kwargs["state"]
182 return {"state": self._parent._app.debug}
185class Components(CoreBase):
186 """Component Loader
188 The core component class that dynamically loads components from components.yml
189 Will try to load components from the core
190 """
192 _components = []
194 def setup(self):
195 self._config = []
197 provider = get_resource_provider()
198 names = provider.list_resource_names("config", "*.yml")
199 for name in names:
200 config = YamlDict("config", name)
201 if "component" in config:
202 self._config.append({"type": config["component"], "config": name})
204 for c in self._config + [
205 {"type": "version"},
206 {"type": "logging"},
207 {"type": "chat"},
208 ]:
209 instance = loader(
210 "daiquiri.core.components",
211 "",
212 c["type"],
213 c.get("config"),
214 **self.initkwargs,
215 )
217 if instance:
218 instance.component_type = c["type"]
219 instance.get_component = self.get_component
220 self._components.append(instance)
222 for c in self._components:
223 c.after_all_setup(self)
225 self.register_route(AvailableComponentsResource, "")
226 self.register_route(ConfigExportResource, "/config")
227 self.register_route(ComponentReloadResource, "/reload")
228 self.register_route(ComponentDebugResource, "/debug")
230 def close(self):
231 """Clean up the services at in the end.
233 After this call, services should not be accessed anymore
234 """
235 for c in self._components:
236 c.close()
238 def get_export_config(self):
239 """Get exported config values from components"""
240 export = {
241 "beamline": self._base_config["meta_beamline"],
242 "header_color": self._base_config.get("header_color"),
243 "header_title": self._base_config.get("header_title"),
244 }
245 for c in self._components:
246 export[c._base_url] = c.get_export_config()
247 return export
249 def reload(self):
250 """Reload components
252 Reload the root hardware object first, then reload listed components as
253 these may depend on the hardware object
255 Components must decide what to reload by implementing their own
256 `reload` method
257 """
258 log.get("user").info("Reloading components", type="app")
259 start = time.time()
261 success = True
262 try:
263 self._hardware.reload()
264 except (SyntaxErrorYAML, InvalidYAML) as ex:
265 log.get("user").exception("Error in hardware config", type="hardware")
266 print(ex.pretty())
267 success = False
268 else:
269 for c in self._components:
270 logger.info(f"Reloading config for {c.__class__.__name__}")
271 try:
272 if isinstance(c._config, YamlDict):
273 c._config.reload()
274 c.validate_config()
276 except (SyntaxErrorYAML, InvalidYAML) as ex:
277 log.get("user").exception("Error in component config", type="app")
278 print(ex.pretty())
279 success = False
280 else:
281 logger.info(f"Reloading component {c.__class__.__name__}")
282 c.reload()
284 log.get("user").info(
285 f"Components reloaded, took {(time.time() - start):.1f}s", type="app"
286 )
288 self.emit("reloader", success, namespace="/app")
290 return True
292 def get_component(self, type: str):
293 """Returns a component by it's type, else None"""
294 for c in self._components:
295 if c.component_type == type:
296 return c
297 return None
299 def get_components(self):
300 components = [f.info() for f in self._components]
301 return {"total": len(components), "rows": components}
304class ComponentResource(CoreResource):
305 """ComponentResource that all component resources inherit from"""
307 pass
310class Component(CoreBase):
311 """The abstract class that all components inherit from
313 The component loads a config with the same name as the class.lower() and logs a
314 message if one cannot be found. The class also registeres a before_request handler
315 to essentially enable middleware on the request, by default requiring a valid session
316 and then checking require_control as needed
318 The base component class also provide handling for execution and queuing of `Actors`
319 """
321 _config_schema = None
322 _config_export = []
324 _require_session = True
325 _require_blsession = True
327 _actors = []
329 def __init__(self, *args, **kwargs):
330 self._running_actors = {}
332 if args[0]:
333 if isinstance(args[0], dict):
334 self._config = args[0]
335 else:
336 self._config = YamlDict("config", args[0])
337 self.validate_config()
338 else:
339 self._config = {}
340 super().__init__(*args, **kwargs)
342 logger.debug("Loading Component: {f}".format(f=self._bp))
344 def close(self):
345 """Clean up the service at the end.
347 After this call, the component should not be accessed anymore
348 """
349 pass
351 def get_export_config(self) -> dict:
352 """Get exported config values from this component.
354 The default implementation export values from the yaml config. Only the
355 keys part of white list `_config_export` will be exposed.
356 """
357 export_config = {}
358 for k in self._config_export:
359 export_config[k] = self._config[k]
360 return export_config
362 def after_setup(self):
363 if self._namespace is None:
364 logger.debug(
365 f"namespace is empty, defaulting to base_url: {self._base_url}"
366 )
367 self._namespace = self._base_url
368 self._session.register_namespace(self._namespace)
370 def validate_config(self):
371 if self._config_schema:
372 try:
373 self._config_schema.load(self._config)
374 except ValidationError as err:
375 raise InvalidYAML(
376 {
377 "message": f"{self.__class__.__name__} config is invalid",
378 "file": self._config.resource,
379 "obj": self._config,
380 "errors": err.messages,
381 }
382 ) from None
384 def register_actor_route(self, route_class, route):
385 for k in ["post", "get", "put", "patch", "delete"]:
386 fn = getattr(route_class, k, None)
387 if fn:
388 if hasattr(fn, "__actor_resource__"):
389 actor = fn.__actor_resource__
391 fn = actor_wrapper(actor["name"], **actor["kwargs"])(fn)
392 sch = self.actor_schema(actor["name"])
393 if sch:
394 sch.reloader = partial(self.actor_schema, actor["name"])
395 if hasattr(fn, "_require_staff"):
396 sch._require_staff = fn._require_staff
397 fn = marshal(inp=sch)(fn)
399 setattr(route_class, k, fn)
401 self.register_route(route_class, route)
403 def info(self):
404 """Return a dict of basic info about this component"""
405 return {
406 "name": self.__class__.__name__,
407 "baseurl": "/" + self._bp.lower(),
408 # 'config': self._config
409 }
411 def actor_schema(self, name):
412 actor = self._create_actor(name, return_exception=True)
413 schemaName = name[0].upper() + name[1:] + "Schema"
414 # Either file doesnt exist, or there is a syntax error
415 if isinstance(actor, Exception):
416 schema = type(
417 schemaName,
418 (ComponentActorSchema,),
419 {
420 "Meta": type("Meta", (object,), {}),
421 },
422 )
423 schema.exception = str(actor)
424 schema.traceback = "".join(traceback.format_tb(actor.__traceback__))
425 return schema
427 if not hasattr(actor, "schema"):
428 logger.warning(f"Actor {name} does not have a schema")
429 return None
430 schema = type(schemaName, (actor.schema,), {})
431 schema._actor = actor
432 return schema
434 def _create_actor(self, name: str, basekw=None, return_exception: bool = False):
435 # base must be an importable python module
436 base = "{path}.{cls}".format(
437 path=self._base_config["implementors"].replace("/", "."),
438 cls=self.__class__.__name__.lower(),
439 )
440 actor_key = self._config["actors"].get(name)
441 if actor_key is None:
442 log.get("user").exception(
443 f"Actor {name} is not part of the available actors", type="actor"
444 )
445 return None
447 actor_definition = self._config.get("actors_config", {}).get(name, {})
448 implementor = actor_definition.get("implementor")
449 if implementor:
450 package, module = implementor.rsplit(".", 1)
451 else:
452 package = base
453 module = actor_key
454 actor_config = actor_definition.get("config", {})
456 try:
457 if basekw is None:
458 basekw = {}
459 actor = loader(
460 package, "Actor", module, **basekw, static_config=actor_config
461 )
462 except Exception as e:
463 log.get("user").exception(
464 f"Couldn't load actor {name} from {base}", type="actor"
465 )
466 if return_exception:
467 return e
468 return None
469 return actor
471 def actor(
472 self,
473 name,
474 start=None,
475 success=None,
476 error=None,
477 remove=None,
478 enqueue=False,
479 spawn=False,
480 return_actor=False,
481 actargs={},
482 ):
483 """Launch / enqueue an actor
485 Dynamically load an actor from config {implementors}/{class_name}/{file}
486 File is determined from the component specific config file which maps name -> file
488 Example:
489 >>> config.yaml
490 >>> implementors: implementors/examples
492 >>> testcomponent.yml
493 >>> actors:
494 >>> click: actor1
495 >>> scan: actor2
497 >>> self.actor('click') will execute implementors/examples/testcomponent/actor1.py
498 >>> self.actor('scan') will execute implementors/examples/testcomponent/actor2.py
500 Args:
501 name (str): The name of the actor to start, which is resolved from the config
502 start (fn): Function to call when the actor starts
503 success (fn): Function to call when the actor completes successfully
504 error (fn): Function to call if the actor fails
505 enqueue (boolean): Enqueue the actor rather than executing immediately
506 spawn (boolean): Spawn the actor immediately in a new greenlet
507 return_actor (boolean): Return the actor and greenlet (only for spawn=True)
508 actargs (dict): Dictionary of arguments to pass to the actor
510 Returns:
511 The actor uuid
512 or with return_actor=True (actor, greenlet)
514 """
515 if name in self._actors:
516 actid = str(uuid.uuid4())
517 basekw = {
518 "uid": actid,
519 "name": name,
520 "metadata": self._metadata,
521 "socketio": self._socketio,
522 "stomp": self._stomp,
523 "celery": self.get_component("celery"),
524 "started": self._actor_started,
525 "finished": self._actor_finished,
526 "error": self._actor_error,
527 "_remove": self._actor_remove,
528 }
530 actor = self._create_actor(name, basekw=basekw)
531 if actor:
532 self._running_actors[actid] = [actor, start, success, error, remove]
534 actor.prepare(**actargs)
535 if enqueue or actargs.get("enqueue"):
536 logger.debug(f"Enqueuing actor {name} with uid {actid}")
537 self._queue.push(actor)
539 elif spawn:
540 logger.debug(f"Spawning actor {name} with uid {actid}")
541 greenlet = gevent.spawn(actor.execute)
542 if return_actor:
543 return (actor, greenlet)
545 else:
546 logger.debug(f"Running actor {name} with uid {actid}")
547 self._queue.run_now(actor)
549 return actid
551 else:
552 logger.error(f"No such actor `{name}` on class `{self._bp}`")
554 def _actor_started(self, actid):
555 """Actor started callback
557 Checks if the actorid is registered in the running actors, and then calls the started
558 callback if registered
560 Args:
561 actid (uuid): The actor uuid that started
562 """
563 if not (actid in self._running_actors):
564 logger.warning("Unknown actor started {uid}".format(uid=actid))
565 return
567 actor, start, success, error, remove = self._running_actors[actid]
568 logger.debug(
569 "Actor started {name} {actid}".format(name=actor.name, actid=actid)
570 )
572 if start:
573 start(actid, actor)
575 def _actor_finished(self, actid):
576 """Actor started callback
578 Checks if the actorid is registered in the running actors, and then calls the finished
579 callback if registered
581 Args:
582 actid (uuid): The actor uuid that finished
583 """
584 if not (actid in self._running_actors):
585 logger.warning("Unknown actor completed {uid}".format(uid=actid))
586 return
588 actor, start, success, error, remove = self._running_actors[actid]
589 logger.debug(
590 "Actor finished {name} {actid} took {s}".format(
591 name=actor.name, actid=actid, s=actor.took()
592 )
593 )
595 if success:
596 success(actid, actor.resp(), actor)
598 del self._running_actors[actid]
600 def _actor_error(self, actid, exception):
601 """Actor error callback
603 Checks if the actorid is registered in the running actors, and then calls the error
604 callback if registered
606 Args:
607 actid (uuid): The actor uuid that failed
608 """
609 if not (actid in self._running_actors):
610 logger.warning("Unknown actor error {uid}".format(uid=actid))
611 return
613 actor, start, success, error, remove = self._running_actors[actid]
615 if not isinstance(exception, ComponentActorKilled):
616 log.get("user").exception(
617 f"Actor failed {actor.name}: {exception}", type="actor"
618 )
620 try:
621 eactid = str(uuid.uuid4())
622 eactor = loader(
623 self._base_config["implementors"].replace("/", "."),
624 "Actor",
625 "upload_error",
626 **{
627 "uid": eactid,
628 "metadata": self._metadata,
629 "socketio": self._socketio,
630 "started": lambda *args, **kwargs: None,
631 "finished": lambda *args, **kwargs: None,
632 "error": lambda *args, **kwargs: None,
633 },
634 )
635 eactor.prepare(
636 **{"actid": actid, "exception": exception, "actor": actor}
637 )
638 greenlet = gevent.spawn(eactor.execute)
639 greenlet.join()
641 if eactor._exception:
642 try:
643 raise eactor._exception
644 except Exception:
645 logger.exception("Could not send actor error to uploader")
647 except Exception:
648 logger.exception("Could not send actor error to uploader")
650 if error:
651 error(actid, exception, actor)
653 del self._running_actors[actid]
655 def _actor_remove(self, actid):
656 """Actor remove callback
658 Called if an actor is removed from the queue
660 Args:
661 actid (uuid): The actor uuid that has been removed
663 """
664 if not (actid in self._running_actors):
665 logger.warning("Unknown actor error {uid}".format(uid=actid))
666 return
668 actor, start, success, error, remove = self._running_actors[actid]
669 logger.debug(
670 "Actor removed {name} {actid}".format(name=actor.name, actid=actid)
671 )
673 if remove:
674 remove(actid, actor)
677class ActorStdout:
678 """Mock file object that writes to socket.io and original stdout"""
680 def __init__(self, original, emit, uuid):
681 self.emit = emit
682 self.original = original
683 self.uuid = uuid
684 self._buffer = ""
686 def open(self, *args, **kwargs):
687 pass
689 def isatty(self):
690 return self.original.isatty()
692 def flush(self, *args, **kwargs):
693 """Horrible mimic of terminal \r behaviour"""
694 self.original.flush(*args, **kwargs)
696 lines = self._buffer.split("\n")
697 if lines:
698 if lines[-1].endswith("\r"):
699 sub_lines = lines[-1].split("\r")
700 if len(sub_lines) > 1:
701 lines[-2] = sub_lines[-2] + "\n"
702 lines.pop()
704 self._buffer = "\n".join(lines)
706 self.emit(
707 "actor_output",
708 {"output": self._buffer, "uuid": self.uuid},
709 namespace="/queue",
710 )
712 def write(self, line, *args, **kwargs):
713 self.original.write(line, *args, **kwargs)
714 self._buffer += line
716 if self._buffer.endswith("\n"):
717 self.emit(
718 "actor_output",
719 {"output": self._buffer, "uuid": self.uuid},
720 namespace="/queue",
721 )
723 @property
724 def buffer(self):
725 return self._buffer
728class ComponentActor(ABC):
729 """Actor
731 The abstract actor from which all actors inherit
733 Attribute:
734 static_config: Dictionary with configuration passed to the actor by the
735 yaml configuration.
736 """
738 name = None
739 desc = None
740 metatype = "experiment"
741 saving_args = None
743 # These keys will be prefixed with metadata_ when passed into the actor
744 additional_metadata = {}
746 def __init__(self, *args, static_config=None, **kwargs):
747 self._running = False
748 self.__config = {}
749 """Configuration which can be passed to the actor at the creation from the yaml file"""
750 if static_config is not None:
751 self.__config.update(static_config)
752 self.__dict__.update((k, v) for k, v in kwargs.items())
754 def get_config(self):
755 """Expose the static configuration used to create the actor."""
756 return self.__config
758 def time_estimate(self):
759 if hasattr(self, "schema"):
760 if hasattr(self.schema, "time_estimate"):
761 return self.schema().time_estimate(self.initkwargs)
762 return 0
764 def prepare(self, **kwargs):
765 self.initkwargs = kwargs
766 self._estimate = kwargs.get("estimate", self.time_estimate())
767 self._start = None
768 self._finished = None
769 self._failed = False
770 self._exception = None
771 self.created = time.time()
772 self.data = {}
773 self._stdout = ""
775 def __getitem__(self, key):
776 """Get value from initial arguments or data"""
777 try:
778 return self.initkwargs[key]
779 except KeyError:
780 pass
781 return self.data[key]
783 def get(self, key, default=None):
784 """Get value from initial arguments or data"""
785 try:
786 return self[key]
787 except KeyError:
788 return default
790 def __setitem__(self, key, value):
791 """Overwrite data"""
792 self.data[key] = value
794 def update(self, *args, **kwargs):
795 """Overwrite data"""
796 self.data.update(*args, kwargs)
798 @property
799 def all_data(self):
800 """Initial arguments + data + metadata"""
801 return {
802 **self.data,
803 **self.initkwargs,
804 **{
805 f"metadata_{k}": (
806 v(**self.data, **self.initkwargs) if callable(v) else v
807 )
808 for k, v in self.additional_metadata.items()
809 },
810 }
812 @property
813 def initkwargs_json_serializable(self):
814 """Actor arguments prepared for json serialization"""
815 return self._make_json_serializable(self.initkwargs)
817 @property
818 def initkwargs_json_serialized(self):
819 """Json serialized actor arguments"""
820 return json.dumps(self.initkwargs_json_serializable)
822 @property
823 def all_data_json_serializable(self):
824 """Actor arguments + data prepared for json serialization"""
825 return self._make_json_serializable(self.all_data)
827 @property
828 def all_data_json_serialized(self):
829 """Json serialized actor arguments + data"""
830 return json.dumps(self.all_data_json_serializable, indent=2)
832 @property
833 def data_json_serializable(self):
834 """Actor data prepared for json serialization"""
835 return self._make_json_serializable(self.data)
837 @property
838 def data_json_serialized(self):
839 """Json serialized actor data"""
840 return json.dumps(self.data_json_serializable)
842 def _make_json_serializable(self, adict, remove_callables=True):
843 """Make a dictionary json serializable
845 Args:
846 adict (dict): The dict to serialise
848 Kwargs:
849 remove_callables (bool): Removes callables / none repr objects
851 Returns:
852 safe_dict (dict): The dictionary safe to json serialise
853 """
854 safe_dict = {}
855 for k, v in adict.items():
856 if isinstance(v, dict):
857 safe_dict[k] = self._make_json_serializable(
858 v, remove_callables=remove_callables
859 )
860 else:
861 try:
862 json.dumps(v)
863 except TypeError:
864 default_repr = type(v).__repr__ is object.__repr__
865 if remove_callables and (callable(v) or default_repr):
866 continue
867 safe_dict[k] = str(v)
868 else:
869 safe_dict[k] = v
870 return safe_dict
872 def handle_params(self, before=True):
873 """Handle special `ParamSchema` params
875 If a schema contains an instance of `ParamSchema` process these before
876 and after an actor.
877 """
878 if hasattr(self, "schema"):
879 schema = self.schema()
880 for k, v in schema.fields.items():
881 if isinstance(v, fields.Nested):
882 if issubclass(v.nested, ParamSchema):
883 if k in self.initkwargs:
884 handler = v.nested.handler
885 for p, pv in self.initkwargs[k].items():
886 ty = "before" if before else "after"
887 par = {}
888 par[p] = pv
889 try:
890 log.get("user").info(
891 f"Running {k}.{p} to {pv} {ty} actor",
892 type="hardware",
893 )
894 handler.handle(par, before)
895 log.get("user").info(
896 f"Finished {k}.{p} {ty} actor", type="hardware"
897 )
898 except Exception as e:
899 logger.exception(
900 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}"
901 )
902 log.get("user").exception(
903 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}",
904 type="hardware",
905 )
907 @contextmanager
908 def capture_stdout(self, emit, uuid, after=None):
909 """Context manager to capture stdout
911 Args:
912 emit (function): SocketIO emit function
913 uuid (uuid): Actor uuid
914 """
915 original = sys.stdout
916 sys.stdout = ActorStdout(original, emit, uuid)
917 try:
918 yield sys.stdout
919 finally:
920 if callable(after):
921 after(sys.stdout.buffer)
922 sys.stdout = original
924 def execute(self):
925 """Execute the actor
927 Try to execute the actor, time it, and catch any errors it may raise
928 """
929 self._running = True
930 self._start = time.time()
931 try:
932 self.started(self.uid)
933 self.handle_params()
935 def after(buffer):
936 self._stdout = buffer
938 with self.capture_stdout(self.socketio.emit, self.uid, after):
939 self._resp = self.method(
940 **self.initkwargs
941 ) # TODO: Why do we need to pass these arguments
942 self._finished = time.time()
943 self._running = False
944 self.handle_params(before=False)
945 self.finished(self.uid)
947 # To catch gevent.Timeout as well
948 except BaseException as e:
949 self._running = False
950 self._failed = True
951 self._resp = ""
952 self._finished = time.time()
953 self.handle_params(before=False)
954 self._exception = e
955 self.error(self.uid, e)
956 return e
958 @property
959 def stdout(self):
960 return self._stdout
962 def info(self):
963 """Actor info
965 Returns
966 A dict of the actor info
967 """
968 return {
969 "created": self.created,
970 "started": self._start,
971 "estimate": self._estimate,
972 "finished": self._finished,
973 "failed": self._failed,
974 "name": self.name,
975 "desc": self.desc,
976 "uid": self.uid,
977 "running": self._running,
978 "cls": repr(self),
979 "args": self.initkwargs_json_serializable,
980 "stdout": self._stdout,
981 }
983 def running(self):
984 """The running state of the actor"""
985 return self._running
987 def resp(self):
988 """The response from the finished actor"""
989 return self._resp
991 def took(self):
992 """How long the actor took"""
993 return self._finished - self._start
995 @abstractmethod
996 def method(self, *args, **kwargs):
997 """The actual actor definition"""
998 pass
1000 def remove(self):
1001 """Method is called if an actor is remove from the queue"""
1002 if hasattr(self, "_remove"):
1003 self._remove(self.uid)
1006class ComponentActorKilled(Exception):
1007 """Exception to raise when an actor is killed"""
1009 pass
1012class ComponentActorSchema(Schema):
1013 """Component Actor Schema
1015 Marks a schema for caching (meta)
1016 Allows schema reloading if `reloader` is set
1017 """
1019 reloader = None
1020 _actor: ComponentActor | None = None
1021 """Initialized at the creation by the schema service"""
1023 def __init__(self, *args, **kwargs):
1024 super().__init__(*args, **kwargs)
1025 self.Meta.cache = True
1027 def get_actor(self) -> ComponentActor | None:
1028 actor = getattr(type(self), "_actor", None)
1029 return actor
1031 def reload_schema(self):
1032 if self.reloader:
1033 schema = self.reloader()
1034 if not schema:
1035 logger.warning(
1036 f"Could not reload schema for `{self.__class__.__name__}`"
1037 )
1038 instance = schema()
1040 for key in [
1041 "fields",
1042 "_declared_fields",
1043 "warnings",
1044 "calculated",
1045 "time_estimate",
1046 "schema_validate",
1047 "save_preset",
1048 "get_presets",
1049 ]:
1050 if hasattr(instance, key):
1051 setattr(self, key, getattr(instance, key))
1053 if hasattr(schema, "exception"):
1054 self.exception = schema.exception
1055 self.traceback = schema.traceback
1056 else:
1057 self.exception = None
1058 self.traceback = None
1060 if schema.Meta:
1061 for key in ["uiorder", "uischema", "uigroups", "presets", "cache"]:
1062 if hasattr(schema.Meta, key):
1063 setattr(self.Meta, key, getattr(schema.Meta, key))
1064 else:
1065 if hasattr(self.Meta, key):
1066 setattr(self.Meta, key, None)
1068 def __getattribute__(self, name):
1069 if name == "fields" or name == "_declared_fields":
1070 self.reload_schema()
1072 return super().__getattribute__(name)