Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/components/__init__.py: 75%

536 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4from functools import partial 

5import gevent 

6import time 

7import uuid 

8import json 

9import sys 

10from contextlib import contextmanager 

11import traceback 

12 

13from abc import ABC, abstractmethod 

14from marshmallow import Schema, fields, ValidationError 

15 

16from daiquiri.core.exceptions import InvalidYAML 

17from daiquiri.core.logging import log 

18from daiquiri.core import ( 

19 CoreBase, 

20 CoreResource, 

21 marshal, 

22 require_valid_session, 

23 require_staff, 

24) 

25from daiquiri.core.utils import loader 

26from daiquiri.resources.utils import get_resource_provider, YamlDict 

27from daiquiri.core.exceptions import SyntaxErrorYAML 

28from daiquiri.core.schema import MessageSchema, ErrorSchema 

29from daiquiri.core.schema.components import ComponentInfoSchema, DebugStateSchema 

30from daiquiri.core.schema.metadata import paginated 

31from daiquiri.core.components.params import ParamSchema 

32 

33import logging 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38def actor(name, **kwargs): 

39 def decorator(fn): 

40 fn.__actor_resource__ = {"name": name, "kwargs": kwargs} 

41 return fn 

42 

43 return decorator 

44 

45 

46def actor_wrapper(name, **actkw): 

47 def decorator(fn): 

48 def wrapper(self, *args, **kwargs): 

49 kwargs["actor"] = name 

50 kwargs["component"] = fn.__module__ 

51 

52 if kwargs.get("save"): 

53 kwargs.pop("save") 

54 kwargs.pop("enqueue", None) 

55 datacollectionplan = self._parent._metadata.add_datacollectionplan( 

56 sampleid=kwargs.pop("sampleid"), 

57 scanparameters={"actor": name, **kwargs}, 

58 ) 

59 log.get("user").info( 

60 f"New data collection plan saved for actor '{name}' with id '{datacollectionplan['datacollectionplanid']}'", 

61 type="actor", 

62 ) 

63 return ( 

64 { 

65 "datacollectionplanid": datacollectionplan[ 

66 "datacollectionplanid" 

67 ] 

68 }, 

69 200, 

70 ) 

71 

72 if "preprocess" in actkw: 

73 #  Allow preprocessors to throw errors 

74 try: 

75 preprocess = getattr(self, "preprocess") 

76 kwargs = preprocess(*args, **kwargs) 

77 except Exception as e: 

78 logger.exception("Could not create actor") 

79 log.get("user").error( 

80 f"Could not create actor: `{str(e)}`", type="queue" 

81 ) 

82 return {"error": f"Could not create actor: {str(e)}"}, 400 

83 

84 actkw2 = { 

85 i: actkw[i] for i in actkw if i not in ["preprocess", "synchronous"] 

86 } 

87 

88 if hasattr(self._parent, "actor_success"): 

89 actkw2["success"] = self._parent.actor_success 

90 

91 if hasattr(self._parent, "actor_started"): 

92 actkw2["start"] = self._parent.actor_started 

93 

94 if hasattr(self._parent, "actor_error"): 

95 actkw2["error"] = self._parent.actor_error 

96 

97 if hasattr(self._parent, "actor_remove"): 

98 actkw2["remove"] = self._parent.actor_remove 

99 

100 if "synchronous" in actkw: 

101 (actor, greenlet) = self._parent.actor( 

102 name, actargs=kwargs, return_actor=True, spawn=True, **actkw2 

103 ) 

104 uuid = actor.uid 

105 else: 

106 uuid = self._parent.actor(name, actargs=kwargs, **actkw2) 

107 if not uuid: 

108 return {"error": "Could not load actor"}, 400 

109 

110 ret = {} 

111 if not ("preprocess" in actkw): 

112 ret = kwargs 

113 

114 if "synchronous" in actkw: 

115 greenlet.join() 

116 ret["result"] = actor.resp() 

117 

118 log.get("user").info(f"New actor created '{name}'", type="actor") 

119 

120 return dict({"uuid": uuid}, **ret), 200 

121 

122 # TODO: hmm, need to dig into this, @wraps clobbers kwargs? 

123 if hasattr(fn, "_require_control"): 

124 wrapper._require_control = fn._require_control 

125 if hasattr(fn, "_require_staff"): 

126 wrapper._require_staff = fn._require_staff 

127 

128 return wrapper 

129 

130 return decorator 

131 

132 

133class AvailableComponentsResource(CoreResource): 

134 @marshal(out=[[200, paginated(ComponentInfoSchema), "List of components"]]) 

135 def get(self): 

136 """Get a list of loaded components""" 

137 return self._parent.get_components(), 200 

138 

139 

140class ConfigExportResource(CoreResource): 

141 def get(self): 

142 """Returns any config that is exported from each of the loaded components""" 

143 return self._parent.get_export_config() 

144 

145 

146class ComponentReloadResource(CoreResource): 

147 @require_valid_session 

148 @require_staff 

149 @marshal( 

150 out=[ 

151 [200, MessageSchema(), "Components reloaded"], 

152 [400, ErrorSchema(), "Could not reload components"], 

153 ] 

154 ) 

155 def post(self): 

156 """Reload all components with new config""" 

157 if self._parent.reload(): 

158 return {"message": "Components reloaded"} 

159 else: 

160 return {"error": "Could not reload components"}, 400 

161 

162 

163class ComponentDebugResource(CoreResource): 

164 @marshal(out=[[200, DebugStateSchema(), "Debug state"]]) 

165 def get(self, **kwargs): 

166 """Get the app debug state""" 

167 return {"state": self._parent._app.debug} 

168 

169 @require_staff 

170 @marshal( 

171 inp={ 

172 "state": fields.Bool( 

173 required=True, 

174 metadata={"description": "The requested application debug state"}, 

175 ) 

176 }, 

177 out=[[200, DebugStateSchema(), "Debug state changed"]], 

178 ) 

179 def post(self, **kwargs): 

180 """Change the app debug state""" 

181 self._parent._app.debug = kwargs["state"] 

182 return {"state": self._parent._app.debug} 

183 

184 

185class Components(CoreBase): 

186 """Component Loader 

187 

188 The core component class that dynamically loads components from components.yml 

189 Will try to load components from the core 

190 """ 

191 

192 _components = [] 

193 

194 def setup(self): 

195 self._config = [] 

196 

197 provider = get_resource_provider() 

198 names = provider.list_resource_names("config", "*.yml") 

199 for name in names: 

200 config = YamlDict("config", name) 

201 if "component" in config: 

202 self._config.append({"type": config["component"], "config": name}) 

203 

204 for c in self._config + [ 

205 {"type": "version"}, 

206 {"type": "logging"}, 

207 {"type": "chat"}, 

208 ]: 

209 instance = loader( 

210 "daiquiri.core.components", 

211 "", 

212 c["type"], 

213 c.get("config"), 

214 **self.initkwargs, 

215 ) 

216 

217 if instance: 

218 instance.component_type = c["type"] 

219 instance.get_component = self.get_component 

220 self._components.append(instance) 

221 

222 for c in self._components: 

223 c.after_all_setup(self) 

224 

225 self.register_route(AvailableComponentsResource, "") 

226 self.register_route(ConfigExportResource, "/config") 

227 self.register_route(ComponentReloadResource, "/reload") 

228 self.register_route(ComponentDebugResource, "/debug") 

229 

230 def close(self): 

231 """Clean up the services at in the end. 

232 

233 After this call, services should not be accessed anymore 

234 """ 

235 for c in self._components: 

236 c.close() 

237 

238 def get_export_config(self): 

239 """Get exported config values from components""" 

240 export = { 

241 "beamline": self._base_config["meta_beamline"], 

242 "header_color": self._base_config.get("header_color"), 

243 "header_title": self._base_config.get("header_title"), 

244 } 

245 for c in self._components: 

246 export[c._base_url] = c.get_export_config() 

247 return export 

248 

249 def reload(self): 

250 """Reload components 

251 

252 Reload the root hardware object first, then reload listed components as 

253 these may depend on the hardware object 

254 

255 Components must decide what to reload by implementing their own 

256 `reload` method 

257 """ 

258 log.get("user").info("Reloading components", type="app") 

259 start = time.time() 

260 

261 success = True 

262 try: 

263 self._hardware.reload() 

264 except (SyntaxErrorYAML, InvalidYAML) as ex: 

265 log.get("user").exception("Error in hardware config", type="hardware") 

266 print(ex.pretty()) 

267 success = False 

268 else: 

269 for c in self._components: 

270 logger.info(f"Reloading config for {c.__class__.__name__}") 

271 try: 

272 if isinstance(c._config, YamlDict): 

273 c._config.reload() 

274 c.validate_config() 

275 

276 except (SyntaxErrorYAML, InvalidYAML) as ex: 

277 log.get("user").exception("Error in component config", type="app") 

278 print(ex.pretty()) 

279 success = False 

280 else: 

281 logger.info(f"Reloading component {c.__class__.__name__}") 

282 c.reload() 

283 

284 log.get("user").info( 

285 f"Components reloaded, took {(time.time() - start):.1f}s", type="app" 

286 ) 

287 

288 self.emit("reloader", success, namespace="/app") 

289 

290 return True 

291 

292 def get_component(self, type: str): 

293 """Returns a component by it's type, else None""" 

294 for c in self._components: 

295 if c.component_type == type: 

296 return c 

297 return None 

298 

299 def get_components(self): 

300 components = [f.info() for f in self._components] 

301 return {"total": len(components), "rows": components} 

302 

303 

304class ComponentResource(CoreResource): 

305 """ComponentResource that all component resources inherit from""" 

306 

307 pass 

308 

309 

310class Component(CoreBase): 

311 """The abstract class that all components inherit from 

312 

313 The component loads a config with the same name as the class.lower() and logs a 

314 message if one cannot be found. The class also registeres a before_request handler 

315 to essentially enable middleware on the request, by default requiring a valid session 

316 and then checking require_control as needed 

317 

318 The base component class also provide handling for execution and queuing of `Actors` 

319 """ 

320 

321 _config_schema = None 

322 _config_export = [] 

323 

324 _require_session = True 

325 _require_blsession = True 

326 

327 _actors = [] 

328 

329 def __init__(self, *args, **kwargs): 

330 self._running_actors = {} 

331 

332 if args[0]: 

333 if isinstance(args[0], dict): 

334 self._config = args[0] 

335 else: 

336 self._config = YamlDict("config", args[0]) 

337 self.validate_config() 

338 else: 

339 self._config = {} 

340 super().__init__(*args, **kwargs) 

341 

342 logger.debug("Loading Component: {f}".format(f=self._bp)) 

343 

344 def close(self): 

345 """Clean up the service at the end. 

346 

347 After this call, the component should not be accessed anymore 

348 """ 

349 pass 

350 

351 def get_export_config(self) -> dict: 

352 """Get exported config values from this component. 

353 

354 The default implementation export values from the yaml config. Only the 

355 keys part of white list `_config_export` will be exposed. 

356 """ 

357 export_config = {} 

358 for k in self._config_export: 

359 export_config[k] = self._config[k] 

360 return export_config 

361 

362 def after_setup(self): 

363 if self._namespace is None: 

364 logger.debug( 

365 f"namespace is empty, defaulting to base_url: {self._base_url}" 

366 ) 

367 self._namespace = self._base_url 

368 self._session.register_namespace(self._namespace) 

369 

370 def validate_config(self): 

371 if self._config_schema: 

372 try: 

373 self._config_schema.load(self._config) 

374 except ValidationError as err: 

375 raise InvalidYAML( 

376 { 

377 "message": f"{self.__class__.__name__} config is invalid", 

378 "file": self._config.resource, 

379 "obj": self._config, 

380 "errors": err.messages, 

381 } 

382 ) from None 

383 

384 def register_actor_route(self, route_class, route): 

385 for k in ["post", "get", "put", "patch", "delete"]: 

386 fn = getattr(route_class, k, None) 

387 if fn: 

388 if hasattr(fn, "__actor_resource__"): 

389 actor = fn.__actor_resource__ 

390 

391 fn = actor_wrapper(actor["name"], **actor["kwargs"])(fn) 

392 sch = self.actor_schema(actor["name"]) 

393 if sch: 

394 sch.reloader = partial(self.actor_schema, actor["name"]) 

395 if hasattr(fn, "_require_staff"): 

396 sch._require_staff = fn._require_staff 

397 fn = marshal(inp=sch)(fn) 

398 

399 setattr(route_class, k, fn) 

400 

401 self.register_route(route_class, route) 

402 

403 def info(self): 

404 """Return a dict of basic info about this component""" 

405 return { 

406 "name": self.__class__.__name__, 

407 "baseurl": "/" + self._bp.lower(), 

408 # 'config': self._config 

409 } 

410 

411 def actor_schema(self, name): 

412 actor = self._create_actor(name, return_exception=True) 

413 schemaName = name[0].upper() + name[1:] + "Schema" 

414 # Either file doesnt exist, or there is a syntax error 

415 if isinstance(actor, Exception): 

416 schema = type( 

417 schemaName, 

418 (ComponentActorSchema,), 

419 { 

420 "Meta": type("Meta", (object,), {}), 

421 }, 

422 ) 

423 schema.exception = str(actor) 

424 schema.traceback = "".join(traceback.format_tb(actor.__traceback__)) 

425 return schema 

426 

427 if not hasattr(actor, "schema"): 

428 logger.warning(f"Actor {name} does not have a schema") 

429 return None 

430 schema = type(schemaName, (actor.schema,), {}) 

431 schema._actor = actor 

432 return schema 

433 

434 def _create_actor(self, name: str, basekw=None, return_exception: bool = False): 

435 # base must be an importable python module 

436 base = "{path}.{cls}".format( 

437 path=self._base_config["implementors"].replace("/", "."), 

438 cls=self.__class__.__name__.lower(), 

439 ) 

440 actor_key = self._config["actors"].get(name) 

441 if actor_key is None: 

442 log.get("user").exception( 

443 f"Actor {name} is not part of the available actors", type="actor" 

444 ) 

445 return None 

446 

447 actor_definition = self._config.get("actors_config", {}).get(name, {}) 

448 implementor = actor_definition.get("implementor") 

449 if implementor: 

450 package, module = implementor.rsplit(".", 1) 

451 else: 

452 package = base 

453 module = actor_key 

454 actor_config = actor_definition.get("config", {}) 

455 

456 try: 

457 if basekw is None: 

458 basekw = {} 

459 actor = loader( 

460 package, "Actor", module, **basekw, static_config=actor_config 

461 ) 

462 except Exception as e: 

463 log.get("user").exception( 

464 f"Couldn't load actor {name} from {base}", type="actor" 

465 ) 

466 if return_exception: 

467 return e 

468 return None 

469 return actor 

470 

471 def actor( 

472 self, 

473 name, 

474 start=None, 

475 success=None, 

476 error=None, 

477 remove=None, 

478 enqueue=False, 

479 spawn=False, 

480 return_actor=False, 

481 actargs={}, 

482 ): 

483 """Launch / enqueue an actor 

484 

485 Dynamically load an actor from config {implementors}/{class_name}/{file} 

486 File is determined from the component specific config file which maps name -> file 

487 

488 Example: 

489 >>> config.yaml 

490 >>> implementors: implementors/examples 

491 

492 >>> testcomponent.yml 

493 >>> actors: 

494 >>> click: actor1 

495 >>> scan: actor2 

496 

497 >>> self.actor('click') will execute implementors/examples/testcomponent/actor1.py 

498 >>> self.actor('scan') will execute implementors/examples/testcomponent/actor2.py 

499 

500 Args: 

501 name (str): The name of the actor to start, which is resolved from the config 

502 start (fn): Function to call when the actor starts 

503 success (fn): Function to call when the actor completes successfully 

504 error (fn): Function to call if the actor fails 

505 enqueue (boolean): Enqueue the actor rather than executing immediately 

506 spawn (boolean): Spawn the actor immediately in a new greenlet 

507 return_actor (boolean): Return the actor and greenlet (only for spawn=True) 

508 actargs (dict): Dictionary of arguments to pass to the actor 

509 

510 Returns: 

511 The actor uuid 

512 or with return_actor=True (actor, greenlet) 

513 

514 """ 

515 if name in self._actors: 

516 actid = str(uuid.uuid4()) 

517 basekw = { 

518 "uid": actid, 

519 "name": name, 

520 "metadata": self._metadata, 

521 "socketio": self._socketio, 

522 "stomp": self.get_component("stomp"), 

523 "celery": self.get_component("celery"), 

524 "started": self._actor_started, 

525 "finished": self._actor_finished, 

526 "error": self._actor_error, 

527 "_remove": self._actor_remove, 

528 } 

529 

530 actor = self._create_actor(name, basekw=basekw) 

531 if actor: 

532 self._running_actors[actid] = [actor, start, success, error, remove] 

533 

534 actor.prepare(**actargs) 

535 if enqueue or actargs.get("enqueue"): 

536 logger.debug(f"Enqueuing actor {name} with uid {actid}") 

537 self._queue.push(actor) 

538 

539 elif spawn: 

540 logger.debug(f"Spawning actor {name} with uid {actid}") 

541 greenlet = gevent.spawn(actor.execute) 

542 if return_actor: 

543 return (actor, greenlet) 

544 

545 else: 

546 logger.debug(f"Running actor {name} with uid {actid}") 

547 self._queue.run_now(actor) 

548 

549 return actid 

550 

551 else: 

552 logger.error(f"No such actor `{name}` on class `{self._bp}`") 

553 

554 def _actor_started(self, actid): 

555 """Actor started callback 

556 

557 Checks if the actorid is registered in the running actors, and then calls the started 

558 callback if registered 

559 

560 Args: 

561 actid (uuid): The actor uuid that started 

562 """ 

563 if not (actid in self._running_actors): 

564 logger.warning("Unknown actor started {uid}".format(uid=actid)) 

565 return 

566 

567 actor, start, success, error, remove = self._running_actors[actid] 

568 logger.debug( 

569 "Actor started {name} {actid}".format(name=actor.name, actid=actid) 

570 ) 

571 

572 if start: 

573 start(actid, actor) 

574 

575 def _actor_finished(self, actid): 

576 """Actor started callback 

577 

578 Checks if the actorid is registered in the running actors, and then calls the finished 

579 callback if registered 

580 

581 Args: 

582 actid (uuid): The actor uuid that finished 

583 """ 

584 if not (actid in self._running_actors): 

585 logger.warning("Unknown actor completed {uid}".format(uid=actid)) 

586 return 

587 

588 actor, start, success, error, remove = self._running_actors[actid] 

589 logger.debug( 

590 "Actor finished {name} {actid} took {s}".format( 

591 name=actor.name, actid=actid, s=actor.took() 

592 ) 

593 ) 

594 

595 if success: 

596 success(actid, actor.resp(), actor) 

597 

598 del self._running_actors[actid] 

599 

600 def _actor_error(self, actid, exception): 

601 """Actor error callback 

602 

603 Checks if the actorid is registered in the running actors, and then calls the error 

604 callback if registered 

605 

606 Args: 

607 actid (uuid): The actor uuid that failed 

608 """ 

609 if not (actid in self._running_actors): 

610 logger.warning("Unknown actor error {uid}".format(uid=actid)) 

611 return 

612 

613 actor, start, success, error, remove = self._running_actors[actid] 

614 

615 if not isinstance(exception, ComponentActorKilled): 

616 log.get("user").exception( 

617 f"Actor failed {actor.name}: {exception}", type="actor" 

618 ) 

619 

620 try: 

621 eactid = str(uuid.uuid4()) 

622 eactor = loader( 

623 self._base_config["implementors"].replace("/", "."), 

624 "Actor", 

625 "upload_error", 

626 **{ 

627 "uid": eactid, 

628 "metadata": self._metadata, 

629 "socketio": self._socketio, 

630 "started": lambda *args, **kwargs: None, 

631 "finished": lambda *args, **kwargs: None, 

632 "error": lambda *args, **kwargs: None, 

633 }, 

634 ) 

635 eactor.prepare( 

636 **{"actid": actid, "exception": exception, "actor": actor} 

637 ) 

638 greenlet = gevent.spawn(eactor.execute) 

639 greenlet.join() 

640 

641 if eactor._exception: 

642 try: 

643 raise eactor._exception 

644 except Exception: 

645 logger.exception("Could not send actor error to uploader") 

646 

647 except Exception: 

648 logger.exception("Could not send actor error to uploader") 

649 

650 if error: 

651 error(actid, exception, actor) 

652 

653 del self._running_actors[actid] 

654 

655 def _actor_remove(self, actid): 

656 """Actor remove callback 

657 

658 Called if an actor is removed from the queue 

659 

660 Args: 

661 actid (uuid): The actor uuid that has been removed 

662 

663 """ 

664 if not (actid in self._running_actors): 

665 logger.warning("Unknown actor error {uid}".format(uid=actid)) 

666 return 

667 

668 actor, start, success, error, remove = self._running_actors[actid] 

669 logger.debug( 

670 "Actor removed {name} {actid}".format(name=actor.name, actid=actid) 

671 ) 

672 

673 if remove: 

674 remove(actid, actor) 

675 

676 

677class ActorStdout: 

678 """Mock file object that writes to socket.io and original stdout""" 

679 

680 def __init__(self, original, emit, uuid): 

681 self.emit = emit 

682 self.original = original 

683 self.uuid = uuid 

684 self._buffer = "" 

685 

686 def open(self, *args, **kwargs): 

687 pass 

688 

689 def isatty(self): 

690 return self.original.isatty() 

691 

692 def flush(self, *args, **kwargs): 

693 """Horrible mimic of terminal \r behaviour""" 

694 self.original.flush(*args, **kwargs) 

695 

696 lines = self._buffer.split("\n") 

697 if lines: 

698 if lines[-1].endswith("\r"): 

699 sub_lines = lines[-1].split("\r") 

700 if len(sub_lines) > 1: 

701 lines[-2] = sub_lines[-2] + "\n" 

702 lines.pop() 

703 

704 self._buffer = "\n".join(lines) 

705 

706 self.emit( 

707 "actor_output", 

708 {"output": self._buffer, "uuid": self.uuid}, 

709 namespace="/queue", 

710 ) 

711 

712 def write(self, line, *args, **kwargs): 

713 self.original.write(line, *args, **kwargs) 

714 self._buffer += line 

715 

716 if self._buffer.endswith("\n"): 

717 self.emit( 

718 "actor_output", 

719 {"output": self._buffer, "uuid": self.uuid}, 

720 namespace="/queue", 

721 ) 

722 

723 @property 

724 def buffer(self): 

725 return self._buffer 

726 

727 def fileno(self): 

728 return self.original.fileno() 

729 

730 

731class ComponentActor(ABC): 

732 """Actor 

733 

734 The abstract actor from which all actors inherit 

735 

736 Attribute: 

737 static_config: Dictionary with configuration passed to the actor by the 

738 yaml configuration. 

739 """ 

740 

741 name = None 

742 desc = None 

743 metatype = "experiment" 

744 saving_args = None 

745 

746 # These keys will be prefixed with metadata_ when passed into the actor 

747 additional_metadata = {} 

748 

749 def __init__(self, *args, static_config=None, **kwargs): 

750 self._running = False 

751 self.__config = {} 

752 """Configuration which can be passed to the actor at the creation from the yaml file""" 

753 if static_config is not None: 

754 self.__config.update(static_config) 

755 self.__dict__.update((k, v) for k, v in kwargs.items()) 

756 

757 def get_config(self): 

758 """Expose the static configuration used to create the actor.""" 

759 return self.__config 

760 

761 def time_estimate(self): 

762 if hasattr(self, "schema"): 

763 if hasattr(self.schema, "time_estimate"): 

764 return self.schema().time_estimate(self.initkwargs) 

765 return 0 

766 

767 def prepare(self, **kwargs): 

768 self.initkwargs = kwargs 

769 self._estimate = kwargs.get("estimate", self.time_estimate()) 

770 self._start = None 

771 self._finished = None 

772 self._failed = False 

773 self._exception = None 

774 self.created = time.time() 

775 self.data = {} 

776 self._stdout = "" 

777 

778 def __getitem__(self, key): 

779 """Get value from initial arguments or data""" 

780 try: 

781 return self.initkwargs[key] 

782 except KeyError: 

783 pass 

784 return self.data[key] 

785 

786 def get(self, key, default=None): 

787 """Get value from initial arguments or data""" 

788 try: 

789 return self[key] 

790 except KeyError: 

791 return default 

792 

793 def __setitem__(self, key, value): 

794 """Overwrite data""" 

795 self.data[key] = value 

796 

797 def update(self, *args, **kwargs): 

798 """Overwrite data""" 

799 self.data.update(*args, kwargs) 

800 

801 @property 

802 def all_data(self): 

803 """Initial arguments + data + metadata""" 

804 return { 

805 **self.data, 

806 **self.initkwargs, 

807 **{ 

808 f"metadata_{k}": ( 

809 v(**self.data, **self.initkwargs) if callable(v) else v 

810 ) 

811 for k, v in self.additional_metadata.items() 

812 }, 

813 } 

814 

815 @property 

816 def initkwargs_json_serializable(self): 

817 """Actor arguments prepared for json serialization""" 

818 return self._make_json_serializable(self.initkwargs) 

819 

820 @property 

821 def initkwargs_json_serialized(self): 

822 """Json serialized actor arguments""" 

823 return json.dumps(self.initkwargs_json_serializable) 

824 

825 @property 

826 def all_data_json_serializable(self): 

827 """Actor arguments + data prepared for json serialization""" 

828 return self._make_json_serializable(self.all_data) 

829 

830 @property 

831 def all_data_json_serialized(self): 

832 """Json serialized actor arguments + data""" 

833 return json.dumps(self.all_data_json_serializable, indent=2) 

834 

835 @property 

836 def data_json_serializable(self): 

837 """Actor data prepared for json serialization""" 

838 return self._make_json_serializable(self.data) 

839 

840 @property 

841 def data_json_serialized(self): 

842 """Json serialized actor data""" 

843 return json.dumps(self.data_json_serializable) 

844 

845 def _make_json_serializable(self, adict, remove_callables=True): 

846 """Make a dictionary json serializable 

847 

848 Args: 

849 adict (dict): The dict to serialise 

850 

851 Kwargs: 

852 remove_callables (bool): Removes callables / none repr objects 

853 

854 Returns: 

855 safe_dict (dict): The dictionary safe to json serialise 

856 """ 

857 safe_dict = {} 

858 for k, v in adict.items(): 

859 if isinstance(v, dict): 

860 safe_dict[k] = self._make_json_serializable( 

861 v, remove_callables=remove_callables 

862 ) 

863 else: 

864 try: 

865 json.dumps(v) 

866 except TypeError: 

867 default_repr = type(v).__repr__ is object.__repr__ 

868 if remove_callables and (callable(v) or default_repr): 

869 continue 

870 safe_dict[k] = str(v) 

871 else: 

872 safe_dict[k] = v 

873 return safe_dict 

874 

875 def handle_params(self, before=True): 

876 """Handle special `ParamSchema` params 

877 

878 If a schema contains an instance of `ParamSchema` process these before 

879 and after an actor. 

880 """ 

881 if hasattr(self, "schema"): 

882 schema = self.schema() 

883 for k, v in schema.fields.items(): 

884 if isinstance(v, fields.Nested): 

885 if issubclass(v.nested, ParamSchema): 

886 if k in self.initkwargs: 

887 handler = v.nested.handler 

888 for p, pv in self.initkwargs[k].items(): 

889 ty = "before" if before else "after" 

890 par = {} 

891 par[p] = pv 

892 try: 

893 log.get("user").info( 

894 f"Running {k}.{p} to {pv} {ty} actor", 

895 type="hardware", 

896 ) 

897 handler.handle(par, before) 

898 log.get("user").info( 

899 f"Finished {k}.{p} {ty} actor", type="hardware" 

900 ) 

901 except Exception as e: 

902 logger.exception( 

903 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}" 

904 ) 

905 log.get("user").exception( 

906 f"Cannot handle {k}.{p} to {pv} {ty} actor, {str(e)}", 

907 type="hardware", 

908 ) 

909 

910 @contextmanager 

911 def capture_stdout(self, emit, uuid, after=None): 

912 """Context manager to capture stdout 

913 

914 Args: 

915 emit (function): SocketIO emit function 

916 uuid (uuid): Actor uuid 

917 """ 

918 original = sys.stdout 

919 sys.stdout = ActorStdout(original, emit, uuid) 

920 try: 

921 yield sys.stdout 

922 finally: 

923 if callable(after): 

924 after(sys.stdout.buffer) 

925 sys.stdout = original 

926 

927 def execute(self): 

928 """Execute the actor 

929 

930 Try to execute the actor, time it, and catch any errors it may raise 

931 """ 

932 self._running = True 

933 self._start = time.time() 

934 try: 

935 self.started(self.uid) 

936 self.handle_params() 

937 

938 def after(buffer): 

939 self._stdout = buffer 

940 

941 with self.capture_stdout(self.socketio.emit, self.uid, after): 

942 self._resp = self.method( 

943 **self.initkwargs 

944 ) # TODO: Why do we need to pass these arguments 

945 self._finished = time.time() 

946 self._running = False 

947 self.handle_params(before=False) 

948 self.finished(self.uid) 

949 

950 # To catch gevent.Timeout as well 

951 except BaseException as e: 

952 self._running = False 

953 self._failed = True 

954 self._resp = "" 

955 self._finished = time.time() 

956 self.handle_params(before=False) 

957 self._exception = e 

958 self.error(self.uid, e) 

959 return e 

960 

961 @property 

962 def stdout(self): 

963 return self._stdout 

964 

965 def info(self): 

966 """Actor info 

967 

968 Returns 

969 A dict of the actor info 

970 """ 

971 return { 

972 "created": self.created, 

973 "started": self._start, 

974 "estimate": self._estimate, 

975 "finished": self._finished, 

976 "failed": self._failed, 

977 "name": self.name, 

978 "desc": self.desc, 

979 "uid": self.uid, 

980 "running": self._running, 

981 "cls": repr(self), 

982 "args": self.initkwargs_json_serializable, 

983 "stdout": self._stdout, 

984 } 

985 

986 def running(self): 

987 """The running state of the actor""" 

988 return self._running 

989 

990 def resp(self): 

991 """The response from the finished actor""" 

992 return self._resp 

993 

994 def took(self): 

995 """How long the actor took""" 

996 return self._finished - self._start 

997 

998 @abstractmethod 

999 def method(self, *args, **kwargs): 

1000 """The actual actor definition""" 

1001 pass 

1002 

1003 def remove(self): 

1004 """Method is called if an actor is remove from the queue""" 

1005 if hasattr(self, "_remove"): 

1006 self._remove(self.uid) 

1007 

1008 

1009class ComponentActorKilled(Exception): 

1010 """Exception to raise when an actor is killed""" 

1011 

1012 pass 

1013 

1014 

1015class ComponentActorSchema(Schema): 

1016 """Component Actor Schema 

1017 

1018 Marks a schema for caching (meta) 

1019 Allows schema reloading if `reloader` is set 

1020 """ 

1021 

1022 reloader = None 

1023 _actor: ComponentActor | None = None 

1024 """Initialized at the creation by the schema service""" 

1025 

1026 def __init__(self, *args, **kwargs): 

1027 super().__init__(*args, **kwargs) 

1028 self.Meta.cache = True 

1029 

1030 def get_actor(self) -> ComponentActor | None: 

1031 actor = getattr(type(self), "_actor", None) 

1032 return actor 

1033 

1034 def reload_schema(self): 

1035 if self.reloader: 

1036 schema = self.reloader() 

1037 if not schema: 

1038 logger.warning( 

1039 f"Could not reload schema for `{self.__class__.__name__}`" 

1040 ) 

1041 instance = schema() 

1042 

1043 for key in [ 

1044 "fields", 

1045 "_declared_fields", 

1046 "warnings", 

1047 "calculated", 

1048 "time_estimate", 

1049 "schema_validate", 

1050 "save_preset", 

1051 "get_presets", 

1052 ]: 

1053 if hasattr(instance, key): 

1054 setattr(self, key, getattr(instance, key)) 

1055 

1056 if hasattr(schema, "exception"): 

1057 self.exception = schema.exception 

1058 self.traceback = schema.traceback 

1059 else: 

1060 self.exception = None 

1061 self.traceback = None 

1062 

1063 if schema.Meta: 

1064 for key in ["uiorder", "uischema", "uigroups", "presets", "cache"]: 

1065 if hasattr(schema.Meta, key): 

1066 setattr(self.Meta, key, getattr(schema.Meta, key)) 

1067 else: 

1068 if hasattr(self.Meta, key): 

1069 setattr(self.Meta, key, None) 

1070 

1071 def __getattribute__(self, name): 

1072 if name == "fields" or name == "_declared_fields": 

1073 self.reload_schema() 

1074 

1075 return super().__getattribute__(name)