Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/samplescan.py: 53%
128 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import datetime
5from flask import g
6from marshmallow import ValidationError
8from daiquiri.core import require_control, require_staff, marshal
9from daiquiri.core.logging import log
10from daiquiri.core.components import (
11 Component,
12 ComponentResource,
13 actor,
14 ComponentActorKilled,
15)
16from daiquiri.core.components.dcutilsmixin import DCUtilsMixin
17from daiquiri.core.schema import ErrorSchema, MessageSchema, ValidationErrorSchema
18from daiquiri.core.schema.samplescan import ScanConfigSchema, AvailableScansSchema
19from daiquiri.core.schema.metadata import paginated
21import logging
24logger = logging.getLogger(__name__)
27class QueueScanResource(ComponentResource):
28 @marshal(
29 out=[
30 [200, MessageSchema(), "Scan successfully queued"],
31 [400, ErrorSchema(), "Could not queue scan"],
32 [404, ErrorSchema(), "No such data collection plan"],
33 [422, ValidationErrorSchema(), "Data collection plan is not valid"],
34 ]
35 )
36 def post(self, datacollectionplanid):
37 """Queue a scan from a datacollectionplan"""
38 return self._parent.queue_scan(datacollectionplanid)
41class AvailableScansResource(ComponentResource):
42 @marshal(out=[[200, paginated(AvailableScansSchema), "A list of available scans"]])
43 def get(self, **kwargs):
44 """Get a list of available scans"""
45 return self._parent.get_available_scans()
48class Samplescan(Component, DCUtilsMixin):
49 _base_url = "samplescan"
50 _config_schema = ScanConfigSchema()
52 def setup(self):
53 self._scan_actors = []
54 self.register_route(AvailableScansResource, "")
55 self.register_route(QueueScanResource, "/queue/<int:datacollectionplanid>")
57 self._generate_scan_actors()
59 def reload(self):
60 self._generate_scan_actors()
62 def preprocess(self, **kwargs):
63 sample = self._metadata.get_samples(kwargs["sampleid"])
64 if not sample:
65 raise AttributeError(f"No such sample {kwargs['sampleid']}")
66 kwargs["sampleid"] = sample["sampleid"]
67 kwargs["sample"] = sample["name"]
68 kwargs["extrametadata"] = sample["extrametadata"]
69 kwargs["sessionid"] = g.blsession.get("sessionid")
70 (
71 kwargs["containerqueuesampleid"],
72 kwargs["datacollectionplanid"],
73 ) = self._metadata.queue_sample(
74 kwargs["sampleid"],
75 kwargs.get("datacollectionplanid"),
76 scanparameters=kwargs,
77 )
78 kwargs["before_scan_starts"] = self.before_scan_starts
79 kwargs["update_datacollection"] = self.update_datacollection
80 kwargs["open_attachment"] = self._open_dc_attachment
81 kwargs["add_scanqualityindicators"] = self.add_scanqualityindicators
82 kwargs["enqueue"] = kwargs.get("enqueue", True)
83 return kwargs
85 def _generate_scan_actors(self):
86 """Dynamically generate scan actor resources"""
88 def post(self, **kwargs):
89 pass
91 for scan in self._config["scans"]:
92 name = scan["actor"]
94 # populate actors at config root
95 if "actors" not in self._config:
96 self._config["actors"] = {}
97 self._config["actors"][name] = name
98 if "actors_config" not in self._config:
99 self._config["actors_config"] = {}
100 self._config["actors_config"][name] = scan
102 if name in self._actors:
103 continue
105 self._actors.append(name)
106 self._scan_actors.append(name)
107 fn = require_control(actor(name, preprocess=True)(post))
109 if scan.get("require_staff"):
110 fn = require_staff(fn)
112 act_res = type(
113 name,
114 (ComponentResource,),
115 {"post": fn, "preprocess": self.preprocess},
116 )
117 self.register_actor_route(act_res, f"/{name}")
119 def get_available_scans(self):
120 scans = []
121 for scan in self._config["scans"]:
122 tags = scan.get("tags", [])
123 if (not scan.get("require_staff")) or (
124 scan.get("require_staff") and g.user.staff()
125 ):
126 s = {"actor": scan["actor"], "tags": tags}
127 scans.append(s)
129 return {"total": len(scans), "rows": scans}
131 def before_scan_starts(self, actor):
132 """Saving directory is created"""
133 if actor.get("datacollectionid"):
134 self._save_dc_params(actor)
136 def actor_started(self, actid, actor):
137 """Callback when an actor starts
139 For scan actors this will generate a datacollection
140 """
141 if actor.name in self._scan_actors:
142 if actor.metatype is not None:
143 self.start_datacollection(actor)
144 else:
145 self.initialize_saving(actor)
147 logger.info(f"Actor '{actor.name}' with id '{actid}' started")
149 def actor_success(self, actid, response, actor):
150 """Callback when an actor finishes successfully
152 For scan actors this update the datacollection with the endtime and
153 'success' status
154 """
155 if actor.name in self._scan_actors:
156 if actor.get("datacollectionid"):
157 self.update_datacollection(
158 actor, endtime=datetime.now(), runstatus="Successful", emit_end=True
159 )
160 self._save_dc_log(actor)
161 else:
162 self.actor_remove(actid, actor)
164 logger.info(f"Actor '{actor.name}' with id '{actid}' finished")
166 def actor_error(self, actid, exception, actor):
167 """Callback when an actor fails
169 For scan actors this will update the datacollection with the end time and
170 'failed' status
171 """
172 status = "Aborted" if isinstance(exception, ComponentActorKilled) else "Failed"
173 if actor.name in self._scan_actors:
174 if actor.get("datacollectionid"):
175 self.update_datacollection(
176 actor, endtime=datetime.now(), runstatus=status
177 )
178 if status == "Failed":
179 self._save_dc_exception(actor, exception)
180 else:
181 self.actor_remove(actid, actor)
182 if status == "Failed":
183 logger.error(f"Actor '{actor.name}' with id '{actid}' failed")
184 else:
185 logger.info(f"Actor '{actor.name}' with id '{actid}' aborted")
187 def actor_remove(self, actid, actor):
188 """Callback when an actor is removed from the queue
190 For scan actors this will remove the item from the database queue
191 """
192 if actor.name in self._scan_actors:
193 self._metadata.unqueue_sample(
194 actor["sampleid"],
195 containerqueuesampleid=actor["containerqueuesampleid"],
196 no_context=True,
197 )
199 def queue_scan(self, datacollectionplanid):
200 """Queue a data collection plan"""
201 datacollectionplan = self._metadata.get_datacollectionplans(
202 datacollectionplanid=datacollectionplanid
203 )
204 if not datacollectionplan:
205 return {"error": "Data collection plan not found"}, 404
207 if datacollectionplan["datacollectionid"]:
208 return (
209 {
210 "error": f"That data collection plan has already been executed `datacollectionid`: {datacollectionplan['datacollectionid']}"
211 },
212 400,
213 )
215 params = {
216 "sampleid": datacollectionplan["sampleid"],
217 **datacollectionplan["scanparameters"],
218 }
220 if "actor" not in params:
221 return (
222 {
223 "error": "No actor specified within the selected data collection plan"
224 },
225 400,
226 )
228 actor = params.pop("actor")
229 params.pop("component")
231 # TODO: This all needs factoring out of `actor_wrapper` to make
232 # it reusable
233 schema = self.actor_schema(actor)
234 schema_inst = schema()
235 try:
236 schema_inst.load(params)
237 except ValidationError as err:
238 return {"messages": err.messages}, 422
240 params["datacollectionplanid"] = datacollectionplan["datacollectionplanid"]
241 params = self.preprocess(**params)
243 actkw = {
244 "success": self.actor_success,
245 "start": self.actor_started,
246 "error": self.actor_error,
247 "remove": self.actor_remove,
248 }
249 uuid = self.actor(actor, actargs=params, **actkw)
251 if uuid:
252 log.get("user").info(f"New actor created '{actor}'", type="actor")
253 return {"message": f"Actor created with uuid {uuid}"}
254 else:
255 return {"error": "Could not create actor"}, 400