Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/components/__init__.py: 75%
536 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4from functools import partial
5import gevent
6import time
7import uuid
8import json
9import sys
10from contextlib import contextmanager
11import traceback
13from abc import ABC, abstractmethod
14from marshmallow import Schema, fields, ValidationError
16from daiquiri.core.exceptions import InvalidYAML
17from daiquiri.core.logging import log
18from daiquiri.core import (
19 CoreBase,
20 CoreResource,
21 marshal,
22 require_valid_session,
23 require_staff,
24)
25from daiquiri.core.utils import loader
26from daiquiri.resources.utils import get_resource_provider, YamlDict
27from daiquiri.core.exceptions import SyntaxErrorYAML
28from daiquiri.core.schema import MessageSchema, ErrorSchema
29from daiquiri.core.schema.components import ComponentInfoSchema, DebugStateSchema
30from daiquiri.core.schema.metadata import paginated
31from daiquiri.core.components.params import ParamSchema
33import logging
35logger = logging.getLogger(__name__)
38def actor(name, **kwargs):
39 def decorator(fn):
40 fn.__actor_resource__ = {"name": name, "kwargs": kwargs}
41 return fn
43 return decorator
46def actor_wrapper(name, **actkw):
47 def decorator(fn):
48 def wrapper(self, *args, **kwargs):
49 kwargs["actor"] = name
50 kwargs["component"] = fn.__module__
52 if kwargs.get("save"):
53 kwargs.pop("save")
54 kwargs.pop("enqueue", None)
55 datacollectionplan = self._parent._metadata.add_datacollectionplan(
56 sampleid=kwargs.pop("sampleid"),
57 scanparameters={"actor": name, **kwargs},
58 )
59 log.get("user").info(
60 f"New data collection plan saved for actor '{name}' with id '{datacollectionplan['datacollectionplanid']}'",
61 type="actor",
62 )
63 return (
64 {
65 "datacollectionplanid": datacollectionplan[
66 "datacollectionplanid"
67 ]
68 },
69 200,
70 )
72 if "preprocess" in actkw:
73 # Allow preprocessors to throw errors
74 try:
75 preprocess = getattr(self, "preprocess")
76 kwargs = preprocess(*args, **kwargs)
77 except Exception as e:
78 logger.exception("Could not create actor")
79 log.get("user").error(
80 f"Could not create actor: `{str(e)}`", type="queue"
81 )
82 return {"error": f"Could not create actor: {str(e)}"}, 400
84 actkw2 = {
85 i: actkw[i] for i in actkw if i not in ["preprocess", "synchronous"]
86 }
88 if hasattr(self._parent, "actor_success"):
89 actkw2["success"] = self._parent.actor_success
91 if hasattr(self._parent, "actor_started"):
92 actkw2["start"] = self._parent.actor_started
94 if hasattr(self._parent, "actor_error"):
95 actkw2["error"] = self._parent.actor_error
97 if hasattr(self._parent, "actor_remove"):
98 actkw2["remove"] = self._parent.actor_remove
100 if "synchronous" in actkw:
101 (actor, greenlet) = self._parent.actor(
102 name, actargs=kwargs, return_actor=True, spawn=True, **actkw2
103 )
104 uuid = actor.uid
105 else:
106 uuid = self._parent.actor(name, actargs=kwargs, **actkw2)
107 if not uuid:
108 return {"error": "Could not load actor"}, 400
110 ret = {}
111 if not ("preprocess" in actkw):
112 ret = kwargs
114 if "synchronous" in actkw:
115 greenlet.join()
116 ret["result"] = actor.resp()
118 log.get("user").info(f"New actor created '{name}'", type="actor")
120 return dict({"uuid": uuid}, **ret), 200
122 # TODO: hmm, need to dig into this, @wraps clobbers kwargs?
123 if hasattr(fn, "_require_control"):
124 wrapper._require_control = fn._require_control
125 if hasattr(fn, "_require_staff"):
126 wrapper._require_staff = fn._require_staff
128 return wrapper
130 return decorator
133class AvailableComponentsResource(CoreResource):
134 @marshal(out=[[200, paginated(ComponentInfoSchema), "List of components"]])
135 def get(self):
136 """Get a list of loaded components"""
137 return self._parent.get_components(), 200
140class ConfigExportResource(CoreResource):
141 def get(self):
142 """Returns any config that is exported from each of the loaded components"""
143 return self._parent.get_export_config()
146class ComponentReloadResource(CoreResource):
147 @require_valid_session
148 @require_staff
149 @marshal(
150 out=[
151 [200, MessageSchema(), "Components reloaded"],
152 [400, ErrorSchema(), "Could not reload components"],
153 ]
154 )
155 def post(self):
156 """Reload all components with new config"""
157 if self._parent.reload():
158 return {"message": "Components reloaded"}
159 else:
160 return {"error": "Could not reload components"}, 400
163class ComponentDebugResource(CoreResource):
164 @marshal(out=[[200, DebugStateSchema(), "Debug state"]])
165 def get(self, **kwargs):
166 """Get the app debug state"""
167 return {"state": self._parent._app.debug}
169 @require_staff
170 @marshal(
171 inp={
172 "state": fields.Bool(
173 required=True,
174 metadata={"description": "The requested application debug state"},
175 )
176 },
177 out=[[200, DebugStateSchema(), "Debug state changed"]],
178 )
179 def post(self, **kwargs):
180 """Change the app debug state"""
181 self._parent._app.debug = kwargs["state"]
182 return {"state": self._parent._app.debug}
185class Components(CoreBase):
186 """Component Loader
188 The core component class that dynamically loads components from components.yml
189 Will try to load components from the core
190 """
192 _components = []
194 def setup(self):
195 self._config = []
197 provider = get_resource_provider()
198 names = provider.list_resource_names("config", "*.yml")
199 for name in names:
200 config = YamlDict("config", name)
201 if "component" in config:
202 self._config.append({"type": config["component"], "config": name})
204 for c in self._config + [
205 {"type": "version"},
206 {"type": "logging"},
207 {"type": "chat"},
208 ]:
209 instance = loader(
210 "daiquiri.core.components",
211 "",
212 c["type"],
213 c.get("config"),
214 **self.initkwargs,
215 )
217 if instance:
218 instance.component_type = c["type"]
219 instance.get_component = self.get_component
220 self._components.append(instance)
222 for c in self._components:
223 c.after_all_setup(self)
225 self.register_route(AvailableComponentsResource, "")
226 self.register_route(ConfigExportResource, "/config")
227 self.register_route(ComponentReloadResource, "/reload")
228 self.register_route(ComponentDebugResource, "/debug")
230 def close(self):
231 """Clean up the services at in the end.
233 After this call, services should not be accessed anymore
234 """
235 for c in self._components:
236 c.close()
238 def get_export_config(self):
239 """Get exported config values from components"""
240 export = {
241 "beamline": self._base_config["meta_beamline"],
242 "header_color": self._base_config.get("header_color"),
243 "header_title": self._base_config.get("header_title"),
244 }
245 for c in self._components:
246 export[c._base_url] = c.get_export_config()
247 return export
249 def reload(self):
250 """Reload components
252 Reload the root hardware object first, then reload listed components as
253 these may depend on the hardware object
255 Components must decide what to reload by implementing their own
256 `reload` method
257 """
258 log.get("user").info("Reloading components", type="app")
259 start = time.time()
261 success = True
262 try:
263 self._hardware.reload()
264 except (SyntaxErrorYAML, InvalidYAML) as ex:
265 log.get("user").exception("Error in hardware config", type="hardware")
266 print(ex.pretty())
267 success = False
268 else:
269 for c in self._components:
270 logger.info(f"Reloading config for {c.__class__.__name__}")
271 try:
272 if isinstance(c._config, YamlDict):
273 c._config.reload()
274 c.validate_config()
276 except (SyntaxErrorYAML, InvalidYAML) as ex:
277 log.get("user").exception("Error in component config", type="app")
278 print(ex.pretty())
279 success = False
280 else:
281 logger.info(f"Reloading component {c.__class__.__name__}")
282 c.reload()
284 log.get("user").info(
285 f"Components reloaded, took {(time.time() - start):.1f}s", type="app"
286 )
288 self.emit("reloader", success, namespace="/app")
290 return True
292 def get_component(self, type: str):
293 """Returns a component by it's type, else None"""
294 for c in self._components:
295 if c.component_type == type:
296 return c
297 return None
299 def get_components(self):
300 components = [f.info() for f in self._components]
301 return {"total": len(components), "rows": components}
304class ComponentResource(CoreResource):
305 """ComponentResource that all component resources inherit from"""
307 pass
310class Component(CoreBase):
311 """The abstract class that all components inherit from
313 The component loads a config with the same name as the class.lower() and logs a
314 message if one cannot be found. The class also registeres a before_request handler
315 to essentially enable middleware on the request, by default requiring a valid session
316 and then checking require_control as needed
318 The base component class also provide handling for execution and queuing of `Actors`
319 """
321 _config_schema = None
322 _config_export = []
324 _require_session = True
325 _require_blsession = True
327 _actors = []
329 def __init__(self, *args, **kwargs):
330 self._running_actors = {}
332 if args[0]:
333 if isinstance(args[0], dict):
334 self._config = args[0]
335 else:
336 self._config = YamlDict("config", args[0])
337 self.validate_config()
338 else:
339 self._config = {}
340 super().__init__(*args, **kwargs)
342 logger.debug("Loading Component: {f}".format(f=self._bp))
344 def close(self):
345 """Clean up the service at the end.
347 After this call, the component should not be accessed anymore
348 """
349 pass
351 def get_export_config(self) -> dict:
352 """Get exported config values from this component.
354 The default implementation export values from the yaml config. Only the
355 keys part of white list `_config_export` will be exposed.
356 """
357 export_config = {}
358 for k in self._config_export:
359 export_config[k] = self._config[k]
360 return export_config
362 def after_setup(self):
363 if self._namespace is None:
364 logger.debug(
365 f"namespace is empty, defaulting to base_url: {self._base_url}"
366 )
367 self._namespace = self._base_url
368 self._session.register_namespace(self._namespace)
370 def validate_config(self):
371 if self._config_schema:
372 try:
373 self._config_schema.load(self._config)
374 except ValidationError as err:
375 raise InvalidYAML(
376 {
377 "message": f"{self.__class__.__name__} config is invalid",
378 "file": self._config.resource,
379 "obj": self._config,
380 "errors": err.messages,
381 }
382 ) from None
384 def register_actor_route(self, route_class, route):
385 for k in ["post", "get", "put", "patch", "delete"]:
386 fn = getattr(route_class, k, None)
387 if fn:
388 if hasattr(fn, "__actor_resource__"):
389 actor = fn.__actor_resource__
391 fn = actor_wrapper(actor["name"], **actor["kwargs"])(fn)
392 sch = self.actor_schema(actor["name"])
393 if sch:
394 sch.reloader = partial(self.actor_schema, actor["name"])
395 if hasattr(fn, "_require_staff"):
396 sch._require_staff = fn._require_staff
397 fn = marshal(inp=sch)(fn)
399 setattr(route_class, k, fn)
401 self.register_route(route_class, route)
403 def info(self):
404 """Return a dict of basic info about this component"""
405 return {
406 "name": self.__class__.__name__,
407 "baseurl": "/" + self._bp.lower(),
408 # 'config': self._config
409 }
411 def actor_schema(self, name):
412 actor = self._create_actor(name, return_exception=True)
413 schemaName = name[0].upper() + name[1:] + "Schema"
414 # Either file doesnt exist, or there is a syntax error
415 if isinstance(actor, Exception):
416 schema = type(
417 schemaName,
418 (ComponentActorSchema,),
419 {
420 "Meta": type("Meta", (object,), {}),
421 },
422 )
423 schema.exception = str(actor)
424 schema.traceback = "".join(traceback.format_tb(actor.__traceback__))
425 return schema
427 if not hasattr(actor, "schema"):
428 logger.warning(f"Actor {name} does not have a schema")
429 return None
430 schema = type(schemaName, (actor.schema,), {})
431 schema._actor = actor
432 return schema
434 def _create_actor(self, name: str, basekw=None, return_exception: bool = False):
435 # base must be an importable python module
436 base = "{path}.{cls}".format(
437 path=self._base_config["implementors"].replace("/", "."),
438 cls=self.__class__.__name__.lower(),
439 )
440 actor_key = self._config["actors"].get(name)
441 if actor_key is None:
442 log.get("user").exception(
443 f"Actor {name} is not part of the available actors", type="actor"
444 )
445 return None
447 actor_definition = self._config.get("actors_config", {}).get(name, {})
448 implementor = actor_definition.get("implementor")
449 if implementor:
450 package, module = implementor.rsplit(".", 1)
451 else:
452 package = base
453 module = actor_key
454 actor_config = actor_definition.get("config", {})
456 try:
457 if basekw is None:
458 basekw = {}
459 actor = loader(
460 package, "Actor", module, **basekw, static_config=actor_config
461 )
462 except Exception as e:
463 log.get("user").exception(
464 f"Couldn't load actor {name} from {base}", type="actor"
465 )
466 if return_exception:
467 return e
468 return None
469 return actor
471 def actor(
472 self,
473 name,
474 start=None,
475 success=None,
476 error=None,
477 remove=None,
478 enqueue=False,
479 spawn=False,
480 return_actor=False,
481 actargs={},
482 ):
483 """Launch / enqueue an actor
485 Dynamically load an actor from config {implementors}/{class_name}/{file}
486 File is determined from the component specific config file which maps name -> file
488 Example:
489 >>> config.yaml
490 >>> implementors: implementors/examples
492 >>> testcomponent.yml
493 >>> actors:
494 >>> click: actor1
495 >>> scan: actor2
497 >>> self.actor('click') will execute implementors/examples/testcomponent/actor1.py
498 >>> self.actor('scan') will execute implementors/examples/testcomponent/actor2.py
500 Args:
501 name (str): The name of the actor to start, which is resolved from the config
502 start (fn): Function to call when the actor starts
503 success (fn): Function to call when the actor completes successfully
504 error (fn): Function to call if the actor fails
505 enqueue (boolean): Enqueue the actor rather than executing immediately
506 spawn (boolean): Spawn the actor immediately in a new greenlet
507 return_actor (boolean): Return the actor and greenlet (only for spawn=True)
508 actargs (dict): Dictionary of arguments to pass to the actor
510 Returns:
511 The actor uuid
512 or with return_actor=True (actor, greenlet)
514 """
515 if name in self._actors:
516 actid = str(uuid.uuid4())
517 basekw = {
518 "uid": actid,
519 "name": name,
520 "metadata": self._metadata,
521 "socketio": self._socketio,
522 "stomp": self.get_component("stomp"),
523 "celery": self.get_component("celery"),
524 "started": self._actor_started,
525 "finished": self._actor_finished,
526 "error": self._actor_error,
527 "_remove": self._actor_remove,
528 }
530 actor = self._create_actor(name, basekw=basekw)
531 if actor:
532 self._running_actors[actid] = [actor, start, success, error, remove]
534 actor.prepare(**actargs)
535 if enqueue or actargs.get("enqueue"):
536 logger.debug(f"Enqueuing actor {name} with uid {actid}")
537 self._queue.push(actor)
539 elif spawn:
540 logger.debug(f"Spawning actor {name} with uid {actid}")
541 greenlet = gevent.spawn(actor.execute)
542 if return_actor:
543 return (actor, greenlet)
545 else:
546 logger.debug(f"Running actor {name} with uid {actid}")
547 self._queue.run_now(actor)
549 return actid
551 else:
552 logger.error(f"No such actor `{name}` on class `{self._bp}`")
554 def _actor_started(self, actid):
555 """Actor started callback
557 Checks if the actorid is registered in the running actors, and then calls the started
558 callback if registered
560 Args:
561 actid (uuid): The actor uuid that started
562 """
563 if not (actid in self._running_actors):
564 logger.warning("Unknown actor started {uid}".format(uid=actid))
565 return
567 actor, start, success, error, remove = self._running_actors[actid]
568 logger.debug(
569 "Actor started {name} {actid}".format(name=actor.name, actid=actid)
570 )
572 if start:
573 start(actid, actor)
575 def _actor_finished(self, actid):
576 """Actor started callback
578 Checks if the actorid is registered in the running actors, and then calls the finished
579 callback if registered
581 Args:
582 actid (uuid): The actor uuid that finished
583 """
584 if not (actid in self._running_actors):
585 logger.warning("Unknown actor completed {uid}".format(uid=actid))
586 return
588 actor, start, success, error, remove = self._running_actors[actid]
589 logger.debug(
590 "Actor finished {name} {actid} took {s}".format(
591 name=actor.name, actid=actid, s=actor.took()
592 )
593 )
595 if success:
596 success(actid, actor.resp(), actor)
598 del self._running_actors[actid]
600 def _actor_error(self, actid, exception):
601 """Actor error callback
603 Checks if the actorid is registered in the running actors, and then calls the error
604 callback if registered
606 Args:
607 actid (uuid): The actor uuid that failed
608 """
609 if not (actid in self._running_actors):
610 logger.warning("Unknown actor error {uid}".format(uid=actid))
611 return
613 actor, start, success, error, remove = self._running_actors[actid]
615 if not isinstance(exception, ComponentActorKilled):
616 log.get("user").exception(
617 f"Actor failed {actor.name}: {exception}", type="actor"
618 )
620 try:
621 eactid = str(uuid.uuid4())
622 eactor = loader(
623 self._base_config["implementors"].replace("/", "."),
624 "Actor",
625 "upload_error",
626 **{
627 "uid": eactid,
628 "metadata": self._metadata,
629 "socketio": self._socketio,
630 "started": lambda *args, **kwargs: None,
631 "finished": lambda *args, **kwargs: None,
632 "error": lambda *args, **kwargs: None,
633 },
634 )
635 eactor.prepare(
636 **{"actid": actid, "exception": exception, "actor": actor}
637 )
638 greenlet = gevent.spawn(eactor.execute)
639 greenlet.join()
641 if eactor._exception:
642 try:
643 raise eactor._exception
644 except Exception:
645 logger.exception("Could not send actor error to uploader")
647 except Exception:
648 logger.exception("Could not send actor error to uploader")
650 if error:
651 error(actid, exception, actor)
653 del self._running_actors[actid]
655 def _actor_remove(self, actid):
656 """Actor remove callback
658 Called if an actor is removed from the queue
660 Args:
661 actid (uuid): The actor uuid that has been removed
663 """
664 if not (actid in self._running_actors):
665 logger.warning("Unknown actor error {uid}".format(uid=actid))
666 return
668 actor, start, success, error, remove = self._running_actors[actid]
669 logger.debug(
670 "Actor removed {name} {actid}".format(name=actor.name, actid=actid)
671 )
673 if remove:
674 remove(actid, actor)
677class ActorStdout:
678 """Mock file object that writes to socket.io and original stdout"""
680 def __init__(self, original, emit, uuid):
681 self.emit = emit
682 self.original = original
683 self.uuid = uuid
684 self._buffer = ""
686 def open(self, *args, **kwargs):
687 pass
689 def isatty(self):
690 return self.original.isatty()
692 def flush(self, *args, **kwargs):
693 """Horrible mimic of terminal \r behaviour"""
694 self.original.flush(*args, **kwargs)
696 lines = self._buffer.split("\n")
697 if lines:
698 if lines[-1].endswith("\r"):
699 sub_lines = lines[-1].split("\r")
700 if len(sub_lines) > 1:
701 lines[-2] = sub_lines[-2] + "\n"
702 lines.pop()
704 self._buffer = "\n".join(lines)
706 self.emit(
707 "actor_output",
708 {"output": self._buffer, "uuid": self.uuid},
709 namespace="/queue",
710 )
712 def write(self, line, *args, **kwargs):
713 self.original.write(line, *args, **kwargs)
714 self._buffer += line
716 if self._buffer.endswith("\n"):
717 self.emit(
718 "actor_output",
719 {"output": self._buffer, "uuid": self.uuid},
720 namespace="/queue",
721 )
723 @property
724 def buffer(self):
725 return self._buffer
727 def fileno(self):
728 return self.original.fileno()
731class ComponentActor(ABC):
732 """Actor
734 The abstract actor from which all actors inherit
736 Attribute:
737 static_config: Dictionary with configuration passed to the actor by the
738 yaml configuration.
739 """
741 name = None
742 desc = None
743 metatype = "experiment"
744 saving_args = None
746 # These keys will be prefixed with metadata_ when passed into the actor
747 additional_metadata = {}
749 def __init__(self, *args, static_config=None, **kwargs):
750 self._running = False
751 self.__config = {}
752 """Configuration which can be passed to the actor at the creation from the yaml file"""
753 if static_config is not None:
754 self.__config.update(static_config)
755 self.__dict__.update((k, v) for k, v in kwargs.items())
757 def get_config(self):
758 """Expose the static configuration used to create the actor."""
759 return self.__config
761 def time_estimate(self):
762 if hasattr(self, "schema"):
763 if hasattr(self.schema, "time_estimate"):
764 return self.schema().time_estimate(self.initkwargs)
765 return 0
767 def prepare(self, **kwargs):
768 self.initkwargs = kwargs
769 self._estimate = kwargs.get("estimate", self.time_estimate())
770 self._start = None
771 self._finished = None
772 self._failed = False
773 self._exception = None
774 self.created = time.time()
775 self.data = {}
776 self._stdout = ""
778 def __getitem__(self, key):
779 """Get value from initial arguments or data"""
780 try:
781 return self.initkwargs[key]
782 except KeyError:
783 pass
784 return self.data[key]
786 def get(self, key, default=None):
787 """Get value from initial arguments or data"""
788 try:
789 return self[key]
790 except KeyError:
791 return default
793 def __setitem__(self, key, value):
794 """Overwrite data"""
795 self.data[key] = value
797 def update(self, *args, **kwargs):
798 """Overwrite data"""
799 self.data.update(*args, kwargs)
801 @property
802 def all_data(self):
803 """Initial arguments + data + metadata"""
804 return {
805 **self.data,
806 **self.initkwargs,
807 **{
808 f"metadata_{k}": (
809 v(**self.data, **self.initkwargs) if callable(v) else v
810 )
811 for k, v in self.additional_metadata.items()
812 },
813 }
815 @property
816 def initkwargs_json_serializable(self):
817 """Actor arguments prepared for json serialization"""
818 return self._make_json_serializable(self.initkwargs)
820 @property
821 def initkwargs_json_serialized(self):
822 """Json serialized actor arguments"""
823 return json.dumps(self.initkwargs_json_serializable)
825 @property
826 def all_data_json_serializable(self):
827 """Actor arguments + data prepared for json serialization"""
828 return self._make_json_serializable(self.all_data)
830 @property
831 def all_data_json_serialized(self):
832 """Json serialized actor arguments + data"""
833 return json.dumps(self.all_data_json_serializable, indent=2)
835 @property
836 def data_json_serializable(self):
837 """Actor data prepared for json serialization"""
838 return self._make_json_serializable(self.data)
840 @property
841 def data_json_serialized(self):
842 """Json serialized actor data"""
843 return json.dumps(self.data_json_serializable)
845 def _make_json_serializable(self, adict, remove_callables=True):
846 """Make a dictionary json serializable
848 Args:
849 adict (dict): The dict to serialise
851 Kwargs:
852 remove_callables (bool): Removes callables / none repr objects
854 Returns:
855 safe_dict (dict): The dictionary safe to json serialise
856 """
857 safe_dict = {}
858 for k, v in adict.items():
859 if isinstance(v, dict):
860 safe_dict[k] = self._make_json_serializable(
861 v, remove_callables=remove_callables
862 )
863 else:
864 try:
865 json.dumps(v)
866 except TypeError:
867 default_repr = type(v).__repr__ is object.__repr__
868 if remove_callables and (callable(v) or default_repr):
869 continue
870 safe_dict[k] = str(v)
871 else:
872 safe_dict[k] = v
873 return safe_dict
875 def handle_params(self, before=True):
876 """Handle special `ParamSchema` params
878 If a schema contains an instance of `ParamSchema` process these before
879 and after an actor.
880 """
881 if hasattr(self, "schema"):
882 schema = self.schema()
883 for k, v in schema.fields.items():
884 if isinstance(v, fields.Nested):
885 if issubclass(v.nested, ParamSchema):
886 if k in self.initkwargs:
887 handler = v.nested.handler
888 for p, pv in self.initkwargs[k].items():
889 ty = "before" if before else "after"
890 par = {}
891 par[p] = pv
892 try:
893 log.get("user").info(
894 f"Running {k}.{p} to {pv} {ty} actor",
895 type="hardware",
896 )
897 handler.handle(par, before)
898 log.get("user").info(
899 f"Finished {k}.{p} {ty} actor", type="hardware"
900 )
901 except Exception as e:
902 logger.exception(
903 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}"
904 )
905 log.get("user").exception(
906 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}",
907 type="hardware",
908 )
910 @contextmanager
911 def capture_stdout(self, emit, uuid, after=None):
912 """Context manager to capture stdout
914 Args:
915 emit (function): SocketIO emit function
916 uuid (uuid): Actor uuid
917 """
918 original = sys.stdout
919 sys.stdout = ActorStdout(original, emit, uuid)
920 try:
921 yield sys.stdout
922 finally:
923 if callable(after):
924 after(sys.stdout.buffer)
925 sys.stdout = original
927 def execute(self):
928 """Execute the actor
930 Try to execute the actor, time it, and catch any errors it may raise
931 """
932 self._running = True
933 self._start = time.time()
934 try:
935 self.started(self.uid)
936 self.handle_params()
938 def after(buffer):
939 self._stdout = buffer
941 with self.capture_stdout(self.socketio.emit, self.uid, after):
942 self._resp = self.method(
943 **self.initkwargs
944 ) # TODO: Why do we need to pass these arguments
945 self._finished = time.time()
946 self._running = False
947 self.handle_params(before=False)
948 self.finished(self.uid)
950 # To catch gevent.Timeout as well
951 except BaseException as e:
952 self._running = False
953 self._failed = True
954 self._resp = ""
955 self._finished = time.time()
956 self.handle_params(before=False)
957 self._exception = e
958 self.error(self.uid, e)
959 return e
961 @property
962 def stdout(self):
963 return self._stdout
965 def info(self):
966 """Actor info
968 Returns
969 A dict of the actor info
970 """
971 return {
972 "created": self.created,
973 "started": self._start,
974 "estimate": self._estimate,
975 "finished": self._finished,
976 "failed": self._failed,
977 "name": self.name,
978 "desc": self.desc,
979 "uid": self.uid,
980 "running": self._running,
981 "cls": repr(self),
982 "args": self.initkwargs_json_serializable,
983 "stdout": self._stdout,
984 }
986 def running(self):
987 """The running state of the actor"""
988 return self._running
990 def resp(self):
991 """The response from the finished actor"""
992 return self._resp
994 def took(self):
995 """How long the actor took"""
996 return self._finished - self._start
998 @abstractmethod
999 def method(self, *args, **kwargs):
1000 """The actual actor definition"""
1001 pass
1003 def remove(self):
1004 """Method is called if an actor is remove from the queue"""
1005 if hasattr(self, "_remove"):
1006 self._remove(self.uid)
1009class ComponentActorKilled(Exception):
1010 """Exception to raise when an actor is killed"""
1012 pass
1015class ComponentActorSchema(Schema):
1016 """Component Actor Schema
1018 Marks a schema for caching (meta)
1019 Allows schema reloading if `reloader` is set
1020 """
1022 reloader = None
1023 _actor: ComponentActor | None = None
1024 """Initialized at the creation by the schema service"""
1026 def __init__(self, *args, **kwargs):
1027 super().__init__(*args, **kwargs)
1028 self.Meta.cache = True
1030 def get_actor(self) -> ComponentActor | None:
1031 actor = getattr(type(self), "_actor", None)
1032 return actor
1034 def reload_schema(self):
1035 if self.reloader:
1036 schema = self.reloader()
1037 if not schema:
1038 logger.warning(
1039 f"Could not reload schema for `{self.__class__.__name__}`"
1040 )
1041 instance = schema()
1043 for key in [
1044 "fields",
1045 "_declared_fields",
1046 "warnings",
1047 "calculated",
1048 "time_estimate",
1049 "schema_validate",
1050 "save_preset",
1051 "get_presets",
1052 ]:
1053 if hasattr(instance, key):
1054 setattr(self, key, getattr(instance, key))
1056 if hasattr(schema, "exception"):
1057 self.exception = schema.exception
1058 self.traceback = schema.traceback
1059 else:
1060 self.exception = None
1061 self.traceback = None
1063 if schema.Meta:
1064 for key in ["uiorder", "uischema", "uigroups", "presets", "cache"]:
1065 if hasattr(schema.Meta, key):
1066 setattr(self.Meta, key, getattr(schema.Meta, key))
1067 else:
1068 if hasattr(self.Meta, key):
1069 setattr(self.Meta, key, None)
1071 def __getattribute__(self, name):
1072 if name == "fields" or name == "_declared_fields":
1073 self.reload_schema()
1075 return super().__getattribute__(name)