Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/__init__.py: 75%

534 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4from functools import partial 

5import gevent 

6import time 

7import uuid 

8import json 

9import sys 

10from contextlib import contextmanager 

11import traceback 

12 

13from abc import ABC, abstractmethod 

14from marshmallow import Schema, fields, ValidationError 

15 

16from daiquiri.core.exceptions import InvalidYAML 

17from daiquiri.core.logging import log 

18from daiquiri.core import ( 

19 CoreBase, 

20 CoreResource, 

21 marshal, 

22 require_valid_session, 

23 require_staff, 

24) 

25from daiquiri.core.utils import loader 

26from daiquiri.resources.utils import get_resource_provider, YamlDict 

27from daiquiri.core.exceptions import SyntaxErrorYAML 

28from daiquiri.core.schema import MessageSchema, ErrorSchema 

29from daiquiri.core.schema.components import ComponentInfoSchema, DebugStateSchema 

30from daiquiri.core.schema.metadata import paginated 

31from daiquiri.core.components.params import ParamSchema 

32 

33import logging 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38def actor(name, **kwargs): 

39 def decorator(fn): 

40 fn.__actor_resource__ = {"name": name, "kwargs": kwargs} 

41 return fn 

42 

43 return decorator 

44 

45 

46def actor_wrapper(name, **actkw): 

47 def decorator(fn): 

48 def wrapper(self, *args, **kwargs): 

49 kwargs["actor"] = name 

50 kwargs["component"] = fn.__module__ 

51 

52 if kwargs.get("save"): 

53 kwargs.pop("save") 

54 kwargs.pop("enqueue", None) 

55 datacollectionplan = self._parent._metadata.add_datacollectionplan( 

56 sampleid=kwargs.pop("sampleid"), 

57 scanparameters={"actor": name, **kwargs}, 

58 ) 

59 log.get("user").info( 

60 f"New data collection plan saved for actor '{name}' with id '{datacollectionplan['datacollectionplanid']}'", 

61 type="actor", 

62 ) 

63 return ( 

64 { 

65 "datacollectionplanid": datacollectionplan[ 

66 "datacollectionplanid" 

67 ] 

68 }, 

69 200, 

70 ) 

71 

72 if "preprocess" in actkw: 

73 #  Allow preprocessors to throw errors 

74 try: 

75 preprocess = getattr(self, "preprocess") 

76 kwargs = preprocess(*args, **kwargs) 

77 except Exception as e: 

78 logger.exception("Could not create actor") 

79 log.get("user").error( 

80 f"Could not create actor: `{str(e)}`", type="queue" 

81 ) 

82 return {"error": f"Could not create actor: {str(e)}"}, 400 

83 

84 actkw2 = { 

85 i: actkw[i] for i in actkw if i not in ["preprocess", "synchronous"] 

86 } 

87 

88 if hasattr(self._parent, "actor_success"): 

89 actkw2["success"] = self._parent.actor_success 

90 

91 if hasattr(self._parent, "actor_started"): 

92 actkw2["start"] = self._parent.actor_started 

93 

94 if hasattr(self._parent, "actor_error"): 

95 actkw2["error"] = self._parent.actor_error 

96 

97 if hasattr(self._parent, "actor_remove"): 

98 actkw2["remove"] = self._parent.actor_remove 

99 

100 if "synchronous" in actkw: 

101 (actor, greenlet) = self._parent.actor( 

102 name, actargs=kwargs, return_actor=True, spawn=True, **actkw2 

103 ) 

104 uuid = actor.uid 

105 else: 

106 uuid = self._parent.actor(name, actargs=kwargs, **actkw2) 

107 if not uuid: 

108 return {"error": "Could not load actor"}, 400 

109 

110 ret = {} 

111 if not ("preprocess" in actkw): 

112 ret = kwargs 

113 

114 if "synchronous" in actkw: 

115 greenlet.join() 

116 ret["result"] = actor.resp() 

117 

118 log.get("user").info(f"New actor created '{name}'", type="actor") 

119 

120 return dict({"uuid": uuid}, **ret), 200 

121 

122 # TODO: hmm, need to dig into this, @wraps clobbers kwargs? 

123 if hasattr(fn, "_require_control"): 

124 wrapper._require_control = fn._require_control 

125 if hasattr(fn, "_require_staff"): 

126 wrapper._require_staff = fn._require_staff 

127 

128 return wrapper 

129 

130 return decorator 

131 

132 

133class AvailableComponentsResource(CoreResource): 

134 @marshal(out=[[200, paginated(ComponentInfoSchema), "List of components"]]) 

135 def get(self): 

136 """Get a list of loaded components""" 

137 return self._parent.get_components(), 200 

138 

139 

140class ConfigExportResource(CoreResource): 

141 def get(self): 

142 """Returns any config that is exported from each of the loaded components""" 

143 return self._parent.get_export_config() 

144 

145 

146class ComponentReloadResource(CoreResource): 

147 @require_valid_session 

148 @require_staff 

149 @marshal( 

150 out=[ 

151 [200, MessageSchema(), "Components reloaded"], 

152 [400, ErrorSchema(), "Could not reload components"], 

153 ] 

154 ) 

155 def post(self): 

156 """Reload all components with new config""" 

157 if self._parent.reload(): 

158 return {"message": "Components reloaded"} 

159 else: 

160 return {"error": "Could not reload components"}, 400 

161 

162 

163class ComponentDebugResource(CoreResource): 

164 @marshal(out=[[200, DebugStateSchema(), "Debug state"]]) 

165 def get(self, **kwargs): 

166 """Get the app debug state""" 

167 return {"state": self._parent._app.debug} 

168 

169 @require_staff 

170 @marshal( 

171 inp={ 

172 "state": fields.Bool( 

173 required=True, 

174 metadata={"description": "The requested application debug state"}, 

175 ) 

176 }, 

177 out=[[200, DebugStateSchema(), "Debug state changed"]], 

178 ) 

179 def post(self, **kwargs): 

180 """Change the app debug state""" 

181 self._parent._app.debug = kwargs["state"] 

182 return {"state": self._parent._app.debug} 

183 

184 

185class Components(CoreBase): 

186 """Component Loader 

187 

188 The core component class that dynamically loads components from components.yml 

189 Will try to load components from the core 

190 """ 

191 

192 _components = [] 

193 

194 def setup(self): 

195 self._config = [] 

196 

197 provider = get_resource_provider() 

198 names = provider.list_resource_names("config", "*.yml") 

199 for name in names: 

200 config = YamlDict("config", name) 

201 if "component" in config: 

202 self._config.append({"type": config["component"], "config": name}) 

203 

204 for c in self._config + [ 

205 {"type": "version"}, 

206 {"type": "logging"}, 

207 {"type": "chat"}, 

208 ]: 

209 instance = loader( 

210 "daiquiri.core.components", 

211 "", 

212 c["type"], 

213 c.get("config"), 

214 **self.initkwargs, 

215 ) 

216 

217 if instance: 

218 instance.component_type = c["type"] 

219 instance.get_component = self.get_component 

220 self._components.append(instance) 

221 

222 for c in self._components: 

223 c.after_all_setup(self) 

224 

225 self.register_route(AvailableComponentsResource, "") 

226 self.register_route(ConfigExportResource, "/config") 

227 self.register_route(ComponentReloadResource, "/reload") 

228 self.register_route(ComponentDebugResource, "/debug") 

229 

230 def close(self): 

231 """Clean up the services at in the end. 

232 

233 After this call, services should not be accessed anymore 

234 """ 

235 for c in self._components: 

236 c.close() 

237 

238 def get_export_config(self): 

239 """Get exported config values from components""" 

240 export = { 

241 "beamline": self._base_config["meta_beamline"], 

242 "header_color": self._base_config.get("header_color"), 

243 "header_title": self._base_config.get("header_title"), 

244 } 

245 for c in self._components: 

246 export[c._base_url] = c.get_export_config() 

247 return export 

248 

249 def reload(self): 

250 """Reload components 

251 

252 Reload the root hardware object first, then reload listed components as 

253 these may depend on the hardware object 

254 

255 Components must decide what to reload by implementing their own 

256 `reload` method 

257 """ 

258 log.get("user").info("Reloading components", type="app") 

259 start = time.time() 

260 

261 success = True 

262 try: 

263 self._hardware.reload() 

264 except (SyntaxErrorYAML, InvalidYAML) as ex: 

265 log.get("user").exception("Error in hardware config", type="hardware") 

266 print(ex.pretty()) 

267 success = False 

268 else: 

269 for c in self._components: 

270 logger.info(f"Reloading config for {c.__class__.__name__}") 

271 try: 

272 if isinstance(c._config, YamlDict): 

273 c._config.reload() 

274 c.validate_config() 

275 

276 except (SyntaxErrorYAML, InvalidYAML) as ex: 

277 log.get("user").exception("Error in component config", type="app") 

278 print(ex.pretty()) 

279 success = False 

280 else: 

281 logger.info(f"Reloading component {c.__class__.__name__}") 

282 c.reload() 

283 

284 log.get("user").info( 

285 f"Components reloaded, took {(time.time() - start):.1f}s", type="app" 

286 ) 

287 

288 self.emit("reloader", success, namespace="/app") 

289 

290 return True 

291 

292 def get_component(self, type: str): 

293 """Returns a component by it's type, else None""" 

294 for c in self._components: 

295 if c.component_type == type: 

296 return c 

297 return None 

298 

299 def get_components(self): 

300 components = [f.info() for f in self._components] 

301 return {"total": len(components), "rows": components} 

302 

303 

304class ComponentResource(CoreResource): 

305 """ComponentResource that all component resources inherit from""" 

306 

307 pass 

308 

309 

310class Component(CoreBase): 

311 """The abstract class that all components inherit from 

312 

313 The component loads a config with the same name as the class.lower() and logs a 

314 message if one cannot be found. The class also registeres a before_request handler 

315 to essentially enable middleware on the request, by default requiring a valid session 

316 and then checking require_control as needed 

317 

318 The base component class also provide handling for execution and queuing of `Actors` 

319 """ 

320 

321 _config_schema = None 

322 _config_export = [] 

323 

324 _require_session = True 

325 _require_blsession = True 

326 

327 _actors = [] 

328 

329 def __init__(self, *args, **kwargs): 

330 self._running_actors = {} 

331 

332 if args[0]: 

333 if isinstance(args[0], dict): 

334 self._config = args[0] 

335 else: 

336 self._config = YamlDict("config", args[0]) 

337 self.validate_config() 

338 else: 

339 self._config = {} 

340 super().__init__(*args, **kwargs) 

341 

342 logger.debug("Loading Component: {f}".format(f=self._bp)) 

343 

344 def close(self): 

345 """Clean up the service at the end. 

346 

347 After this call, the component should not be accessed anymore 

348 """ 

349 pass 

350 

351 def get_export_config(self) -> dict: 

352 """Get exported config values from this component. 

353 

354 The default implementation export values from the yaml config. Only the 

355 keys part of white list `_config_export` will be exposed. 

356 """ 

357 export_config = {} 

358 for k in self._config_export: 

359 export_config[k] = self._config[k] 

360 return export_config 

361 

362 def after_setup(self): 

363 if self._namespace is None: 

364 logger.debug( 

365 f"namespace is empty, defaulting to base_url: {self._base_url}" 

366 ) 

367 self._namespace = self._base_url 

368 self._session.register_namespace(self._namespace) 

369 

370 def validate_config(self): 

371 if self._config_schema: 

372 try: 

373 self._config_schema.load(self._config) 

374 except ValidationError as err: 

375 raise InvalidYAML( 

376 { 

377 "message": f"{self.__class__.__name__} config is invalid", 

378 "file": self._config.resource, 

379 "obj": self._config, 

380 "errors": err.messages, 

381 } 

382 ) from None 

383 

384 def register_actor_route(self, route_class, route): 

385 for k in ["post", "get", "put", "patch", "delete"]: 

386 fn = getattr(route_class, k, None) 

387 if fn: 

388 if hasattr(fn, "__actor_resource__"): 

389 actor = fn.__actor_resource__ 

390 

391 fn = actor_wrapper(actor["name"], **actor["kwargs"])(fn) 

392 sch = self.actor_schema(actor["name"]) 

393 if sch: 

394 sch.reloader = partial(self.actor_schema, actor["name"]) 

395 if hasattr(fn, "_require_staff"): 

396 sch._require_staff = fn._require_staff 

397 fn = marshal(inp=sch)(fn) 

398 

399 setattr(route_class, k, fn) 

400 

401 self.register_route(route_class, route) 

402 

403 def info(self): 

404 """Return a dict of basic info about this component""" 

405 return { 

406 "name": self.__class__.__name__, 

407 "baseurl": "/" + self._bp.lower(), 

408 # 'config': self._config 

409 } 

410 

411 def actor_schema(self, name): 

412 actor = self._create_actor(name, return_exception=True) 

413 schemaName = name[0].upper() + name[1:] + "Schema" 

414 # Either file doesnt exist, or there is a syntax error 

415 if isinstance(actor, Exception): 

416 schema = type( 

417 schemaName, 

418 (ComponentActorSchema,), 

419 { 

420 "Meta": type("Meta", (object,), {}), 

421 }, 

422 ) 

423 schema.exception = str(actor) 

424 schema.traceback = "".join(traceback.format_tb(actor.__traceback__)) 

425 return schema 

426 

427 if not hasattr(actor, "schema"): 

428 logger.warning(f"Actor {name} does not have a schema") 

429 return None 

430 schema = type(schemaName, (actor.schema,), {}) 

431 schema._actor = actor 

432 return schema 

433 

434 def _create_actor(self, name: str, basekw=None, return_exception: bool = False): 

435 # base must be an importable python module 

436 base = "{path}.{cls}".format( 

437 path=self._base_config["implementors"].replace("/", "."), 

438 cls=self.__class__.__name__.lower(), 

439 ) 

440 actor_key = self._config["actors"].get(name) 

441 if actor_key is None: 

442 log.get("user").exception( 

443 f"Actor {name} is not part of the available actors", type="actor" 

444 ) 

445 return None 

446 

447 actor_definition = self._config.get("actors_config", {}).get(name, {}) 

448 implementor = actor_definition.get("implementor") 

449 if implementor: 

450 package, module = implementor.rsplit(".", 1) 

451 else: 

452 package = base 

453 module = actor_key 

454 actor_config = actor_definition.get("config", {}) 

455 

456 try: 

457 if basekw is None: 

458 basekw = {} 

459 actor = loader( 

460 package, "Actor", module, **basekw, static_config=actor_config 

461 ) 

462 except Exception as e: 

463 log.get("user").exception( 

464 f"Couldn't load actor {name} from {base}", type="actor" 

465 ) 

466 if return_exception: 

467 return e 

468 return None 

469 return actor 

470 

471 def actor( 

472 self, 

473 name, 

474 start=None, 

475 success=None, 

476 error=None, 

477 remove=None, 

478 enqueue=False, 

479 spawn=False, 

480 return_actor=False, 

481 actargs={}, 

482 ): 

483 """Launch / enqueue an actor 

484 

485 Dynamically load an actor from config {implementors}/{class_name}/{file} 

486 File is determined from the component specific config file which maps name -> file 

487 

488 Example: 

489 >>> config.yaml 

490 >>> implementors: implementors/examples 

491 

492 >>> testcomponent.yml 

493 >>> actors: 

494 >>> click: actor1 

495 >>> scan: actor2 

496 

497 >>> self.actor('click') will execute implementors/examples/testcomponent/actor1.py 

498 >>> self.actor('scan') will execute implementors/examples/testcomponent/actor2.py 

499 

500 Args: 

501 name (str): The name of the actor to start, which is resolved from the config 

502 start (fn): Function to call when the actor starts 

503 success (fn): Function to call when the actor completes successfully 

504 error (fn): Function to call if the actor fails 

505 enqueue (boolean): Enqueue the actor rather than executing immediately 

506 spawn (boolean): Spawn the actor immediately in a new greenlet 

507 return_actor (boolean): Return the actor and greenlet (only for spawn=True) 

508 actargs (dict): Dictionary of arguments to pass to the actor 

509 

510 Returns: 

511 The actor uuid 

512 or with return_actor=True (actor, greenlet) 

513 

514 """ 

515 if name in self._actors: 

516 actid = str(uuid.uuid4()) 

517 basekw = { 

518 "uid": actid, 

519 "name": name, 

520 "metadata": self._metadata, 

521 "socketio": self._socketio, 

522 "stomp": self._stomp, 

523 "celery": self.get_component("celery"), 

524 "started": self._actor_started, 

525 "finished": self._actor_finished, 

526 "error": self._actor_error, 

527 "_remove": self._actor_remove, 

528 } 

529 

530 actor = self._create_actor(name, basekw=basekw) 

531 if actor: 

532 self._running_actors[actid] = [actor, start, success, error, remove] 

533 

534 actor.prepare(**actargs) 

535 if enqueue or actargs.get("enqueue"): 

536 logger.debug(f"Enqueuing actor {name} with uid {actid}") 

537 self._queue.push(actor) 

538 

539 elif spawn: 

540 logger.debug(f"Spawning actor {name} with uid {actid}") 

541 greenlet = gevent.spawn(actor.execute) 

542 if return_actor: 

543 return (actor, greenlet) 

544 

545 else: 

546 logger.debug(f"Running actor {name} with uid {actid}") 

547 self._queue.run_now(actor) 

548 

549 return actid 

550 

551 else: 

552 logger.error(f"No such actor `{name}` on class `{self._bp}`") 

553 

554 def _actor_started(self, actid): 

555 """Actor started callback 

556 

557 Checks if the actorid is registered in the running actors, and then calls the started 

558 callback if registered 

559 

560 Args: 

561 actid (uuid): The actor uuid that started 

562 """ 

563 if not (actid in self._running_actors): 

564 logger.warning("Unknown actor started {uid}".format(uid=actid)) 

565 return 

566 

567 actor, start, success, error, remove = self._running_actors[actid] 

568 logger.debug( 

569 "Actor started {name} {actid}".format(name=actor.name, actid=actid) 

570 ) 

571 

572 if start: 

573 start(actid, actor) 

574 

575 def _actor_finished(self, actid): 

576 """Actor started callback 

577 

578 Checks if the actorid is registered in the running actors, and then calls the finished 

579 callback if registered 

580 

581 Args: 

582 actid (uuid): The actor uuid that finished 

583 """ 

584 if not (actid in self._running_actors): 

585 logger.warning("Unknown actor completed {uid}".format(uid=actid)) 

586 return 

587 

588 actor, start, success, error, remove = self._running_actors[actid] 

589 logger.debug( 

590 "Actor finished {name} {actid} took {s}".format( 

591 name=actor.name, actid=actid, s=actor.took() 

592 ) 

593 ) 

594 

595 if success: 

596 success(actid, actor.resp(), actor) 

597 

598 del self._running_actors[actid] 

599 

600 def _actor_error(self, actid, exception): 

601 """Actor error callback 

602 

603 Checks if the actorid is registered in the running actors, and then calls the error 

604 callback if registered 

605 

606 Args: 

607 actid (uuid): The actor uuid that failed 

608 """ 

609 if not (actid in self._running_actors): 

610 logger.warning("Unknown actor error {uid}".format(uid=actid)) 

611 return 

612 

613 actor, start, success, error, remove = self._running_actors[actid] 

614 

615 if not isinstance(exception, ComponentActorKilled): 

616 log.get("user").exception( 

617 f"Actor failed {actor.name}: {exception}", type="actor" 

618 ) 

619 

620 try: 

621 eactid = str(uuid.uuid4()) 

622 eactor = loader( 

623 self._base_config["implementors"].replace("/", "."), 

624 "Actor", 

625 "upload_error", 

626 **{ 

627 "uid": eactid, 

628 "metadata": self._metadata, 

629 "socketio": self._socketio, 

630 "started": lambda *args, **kwargs: None, 

631 "finished": lambda *args, **kwargs: None, 

632 "error": lambda *args, **kwargs: None, 

633 }, 

634 ) 

635 eactor.prepare( 

636 **{"actid": actid, "exception": exception, "actor": actor} 

637 ) 

638 greenlet = gevent.spawn(eactor.execute) 

639 greenlet.join() 

640 

641 if eactor._exception: 

642 try: 

643 raise eactor._exception 

644 except Exception: 

645 logger.exception("Could not send actor error to uploader") 

646 

647 except Exception: 

648 logger.exception("Could not send actor error to uploader") 

649 

650 if error: 

651 error(actid, exception, actor) 

652 

653 del self._running_actors[actid] 

654 

655 def _actor_remove(self, actid): 

656 """Actor remove callback 

657 

658 Called if an actor is removed from the queue 

659 

660 Args: 

661 actid (uuid): The actor uuid that has been removed 

662 

663 """ 

664 if not (actid in self._running_actors): 

665 logger.warning("Unknown actor error {uid}".format(uid=actid)) 

666 return 

667 

668 actor, start, success, error, remove = self._running_actors[actid] 

669 logger.debug( 

670 "Actor removed {name} {actid}".format(name=actor.name, actid=actid) 

671 ) 

672 

673 if remove: 

674 remove(actid, actor) 

675 

676 

677class ActorStdout: 

678 """Mock file object that writes to socket.io and original stdout""" 

679 

680 def __init__(self, original, emit, uuid): 

681 self.emit = emit 

682 self.original = original 

683 self.uuid = uuid 

684 self._buffer = "" 

685 

686 def open(self, *args, **kwargs): 

687 pass 

688 

689 def isatty(self): 

690 return self.original.isatty() 

691 

692 def flush(self, *args, **kwargs): 

693 """Horrible mimic of terminal \r behaviour""" 

694 self.original.flush(*args, **kwargs) 

695 

696 lines = self._buffer.split("\n") 

697 if lines: 

698 if lines[-1].endswith("\r"): 

699 sub_lines = lines[-1].split("\r") 

700 if len(sub_lines) > 1: 

701 lines[-2] = sub_lines[-2] + "\n" 

702 lines.pop() 

703 

704 self._buffer = "\n".join(lines) 

705 

706 self.emit( 

707 "actor_output", 

708 {"output": self._buffer, "uuid": self.uuid}, 

709 namespace="/queue", 

710 ) 

711 

712 def write(self, line, *args, **kwargs): 

713 self.original.write(line, *args, **kwargs) 

714 self._buffer += line 

715 

716 if self._buffer.endswith("\n"): 

717 self.emit( 

718 "actor_output", 

719 {"output": self._buffer, "uuid": self.uuid}, 

720 namespace="/queue", 

721 ) 

722 

723 @property 

724 def buffer(self): 

725 return self._buffer 

726 

727 

728class ComponentActor(ABC): 

729 """Actor 

730 

731 The abstract actor from which all actors inherit 

732 

733 Attribute: 

734 static_config: Dictionary with configuration passed to the actor by the 

735 yaml configuration. 

736 """ 

737 

738 name = None 

739 desc = None 

740 metatype = "experiment" 

741 saving_args = None 

742 

743 # These keys will be prefixed with metadata_ when passed into the actor 

744 additional_metadata = {} 

745 

746 def __init__(self, *args, static_config=None, **kwargs): 

747 self._running = False 

748 self.__config = {} 

749 """Configuration which can be passed to the actor at the creation from the yaml file""" 

750 if static_config is not None: 

751 self.__config.update(static_config) 

752 self.__dict__.update((k, v) for k, v in kwargs.items()) 

753 

754 def get_config(self): 

755 """Expose the static configuration used to create the actor.""" 

756 return self.__config 

757 

758 def time_estimate(self): 

759 if hasattr(self, "schema"): 

760 if hasattr(self.schema, "time_estimate"): 

761 return self.schema().time_estimate(self.initkwargs) 

762 return 0 

763 

764 def prepare(self, **kwargs): 

765 self.initkwargs = kwargs 

766 self._estimate = kwargs.get("estimate", self.time_estimate()) 

767 self._start = None 

768 self._finished = None 

769 self._failed = False 

770 self._exception = None 

771 self.created = time.time() 

772 self.data = {} 

773 self._stdout = "" 

774 

775 def __getitem__(self, key): 

776 """Get value from initial arguments or data""" 

777 try: 

778 return self.initkwargs[key] 

779 except KeyError: 

780 pass 

781 return self.data[key] 

782 

783 def get(self, key, default=None): 

784 """Get value from initial arguments or data""" 

785 try: 

786 return self[key] 

787 except KeyError: 

788 return default 

789 

790 def __setitem__(self, key, value): 

791 """Overwrite data""" 

792 self.data[key] = value 

793 

794 def update(self, *args, **kwargs): 

795 """Overwrite data""" 

796 self.data.update(*args, kwargs) 

797 

798 @property 

799 def all_data(self): 

800 """Initial arguments + data + metadata""" 

801 return { 

802 **self.data, 

803 **self.initkwargs, 

804 **{ 

805 f"metadata_{k}": ( 

806 v(**self.data, **self.initkwargs) if callable(v) else v 

807 ) 

808 for k, v in self.additional_metadata.items() 

809 }, 

810 } 

811 

812 @property 

813 def initkwargs_json_serializable(self): 

814 """Actor arguments prepared for json serialization""" 

815 return self._make_json_serializable(self.initkwargs) 

816 

817 @property 

818 def initkwargs_json_serialized(self): 

819 """Json serialized actor arguments""" 

820 return json.dumps(self.initkwargs_json_serializable) 

821 

822 @property 

823 def all_data_json_serializable(self): 

824 """Actor arguments + data prepared for json serialization""" 

825 return self._make_json_serializable(self.all_data) 

826 

827 @property 

828 def all_data_json_serialized(self): 

829 """Json serialized actor arguments + data""" 

830 return json.dumps(self.all_data_json_serializable, indent=2) 

831 

832 @property 

833 def data_json_serializable(self): 

834 """Actor data prepared for json serialization""" 

835 return self._make_json_serializable(self.data) 

836 

837 @property 

838 def data_json_serialized(self): 

839 """Json serialized actor data""" 

840 return json.dumps(self.data_json_serializable) 

841 

842 def _make_json_serializable(self, adict, remove_callables=True): 

843 """Make a dictionary json serializable 

844 

845 Args: 

846 adict (dict): The dict to serialise 

847 

848 Kwargs: 

849 remove_callables (bool): Removes callables / none repr objects 

850 

851 Returns: 

852 safe_dict (dict): The dictionary safe to json serialise 

853 """ 

854 safe_dict = {} 

855 for k, v in adict.items(): 

856 if isinstance(v, dict): 

857 safe_dict[k] = self._make_json_serializable( 

858 v, remove_callables=remove_callables 

859 ) 

860 else: 

861 try: 

862 json.dumps(v) 

863 except TypeError: 

864 default_repr = type(v).__repr__ is object.__repr__ 

865 if remove_callables and (callable(v) or default_repr): 

866 continue 

867 safe_dict[k] = str(v) 

868 else: 

869 safe_dict[k] = v 

870 return safe_dict 

871 

872 def handle_params(self, before=True): 

873 """Handle special `ParamSchema` params 

874 

875 If a schema contains an instance of `ParamSchema` process these before 

876 and after an actor. 

877 """ 

878 if hasattr(self, "schema"): 

879 schema = self.schema() 

880 for k, v in schema.fields.items(): 

881 if isinstance(v, fields.Nested): 

882 if issubclass(v.nested, ParamSchema): 

883 if k in self.initkwargs: 

884 handler = v.nested.handler 

885 for p, pv in self.initkwargs[k].items(): 

886 ty = "before" if before else "after" 

887 par = {} 

888 par[p] = pv 

889 try: 

890 log.get("user").info( 

891 f"Running {k}.{p} to {pv} {ty} actor", 

892 type="hardware", 

893 ) 

894 handler.handle(par, before) 

895 log.get("user").info( 

896 f"Finished {k}.{p} {ty} actor", type="hardware" 

897 ) 

898 except Exception as e: 

899 logger.exception( 

900 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}" 

901 ) 

902 log.get("user").exception( 

903 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}", 

904 type="hardware", 

905 ) 

906 

907 @contextmanager 

908 def capture_stdout(self, emit, uuid, after=None): 

909 """Context manager to capture stdout 

910 

911 Args: 

912 emit (function): SocketIO emit function 

913 uuid (uuid): Actor uuid 

914 """ 

915 original = sys.stdout 

916 sys.stdout = ActorStdout(original, emit, uuid) 

917 try: 

918 yield sys.stdout 

919 finally: 

920 if callable(after): 

921 after(sys.stdout.buffer) 

922 sys.stdout = original 

923 

924 def execute(self): 

925 """Execute the actor 

926 

927 Try to execute the actor, time it, and catch any errors it may raise 

928 """ 

929 self._running = True 

930 self._start = time.time() 

931 try: 

932 self.started(self.uid) 

933 self.handle_params() 

934 

935 def after(buffer): 

936 self._stdout = buffer 

937 

938 with self.capture_stdout(self.socketio.emit, self.uid, after): 

939 self._resp = self.method( 

940 **self.initkwargs 

941 ) # TODO: Why do we need to pass these arguments 

942 self._finished = time.time() 

943 self._running = False 

944 self.handle_params(before=False) 

945 self.finished(self.uid) 

946 

947 # To catch gevent.Timeout as well 

948 except BaseException as e: 

949 self._running = False 

950 self._failed = True 

951 self._resp = "" 

952 self._finished = time.time() 

953 self.handle_params(before=False) 

954 self._exception = e 

955 self.error(self.uid, e) 

956 return e 

957 

958 @property 

959 def stdout(self): 

960 return self._stdout 

961 

962 def info(self): 

963 """Actor info 

964 

965 Returns 

966 A dict of the actor info 

967 """ 

968 return { 

969 "created": self.created, 

970 "started": self._start, 

971 "estimate": self._estimate, 

972 "finished": self._finished, 

973 "failed": self._failed, 

974 "name": self.name, 

975 "desc": self.desc, 

976 "uid": self.uid, 

977 "running": self._running, 

978 "cls": repr(self), 

979 "args": self.initkwargs_json_serializable, 

980 "stdout": self._stdout, 

981 } 

982 

983 def running(self): 

984 """The running state of the actor""" 

985 return self._running 

986 

987 def resp(self): 

988 """The response from the finished actor""" 

989 return self._resp 

990 

991 def took(self): 

992 """How long the actor took""" 

993 return self._finished - self._start 

994 

995 @abstractmethod 

996 def method(self, *args, **kwargs): 

997 """The actual actor definition""" 

998 pass 

999 

1000 def remove(self): 

1001 """Method is called if an actor is remove from the queue""" 

1002 if hasattr(self, "_remove"): 

1003 self._remove(self.uid) 

1004 

1005 

1006class ComponentActorKilled(Exception): 

1007 """Exception to raise when an actor is killed""" 

1008 

1009 pass 

1010 

1011 

1012class ComponentActorSchema(Schema): 

1013 """Component Actor Schema 

1014 

1015 Marks a schema for caching (meta) 

1016 Allows schema reloading if `reloader` is set 

1017 """ 

1018 

1019 reloader = None 

1020 _actor: ComponentActor | None = None 

1021 """Initialized at the creation by the schema service""" 

1022 

1023 def __init__(self, *args, **kwargs): 

1024 super().__init__(*args, **kwargs) 

1025 self.Meta.cache = True 

1026 

1027 def get_actor(self) -> ComponentActor | None: 

1028 actor = getattr(type(self), "_actor", None) 

1029 return actor 

1030 

1031 def reload_schema(self): 

1032 if self.reloader: 

1033 schema = self.reloader() 

1034 if not schema: 

1035 logger.warning( 

1036 f"Could not reload schema for `{self.__class__.__name__}`" 

1037 ) 

1038 instance = schema() 

1039 

1040 for key in [ 

1041 "fields", 

1042 "_declared_fields", 

1043 "warnings", 

1044 "calculated", 

1045 "time_estimate", 

1046 "schema_validate", 

1047 "save_preset", 

1048 "get_presets", 

1049 ]: 

1050 if hasattr(instance, key): 

1051 setattr(self, key, getattr(instance, key)) 

1052 

1053 if hasattr(schema, "exception"): 

1054 self.exception = schema.exception 

1055 self.traceback = schema.traceback 

1056 else: 

1057 self.exception = None 

1058 self.traceback = None 

1059 

1060 if schema.Meta: 

1061 for key in ["uiorder", "uischema", "uigroups", "presets", "cache"]: 

1062 if hasattr(schema.Meta, key): 

1063 setattr(self.Meta, key, getattr(schema.Meta, key)) 

1064 else: 

1065 if hasattr(self.Meta, key): 

1066 setattr(self.Meta, key, None) 

1067 

1068 def __getattribute__(self, name): 

1069 if name == "fields" or name == "_declared_fields": 

1070 self.reload_schema() 

1071 

1072 return super().__getattribute__(name)