Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/samplescan.py: 53%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from datetime import datetime 

4 

5from flask import g 

6from marshmallow import ValidationError 

7 

8from daiquiri.core import require_control, require_staff, marshal 

9from daiquiri.core.logging import log 

10from daiquiri.core.components import ( 

11 Component, 

12 ComponentResource, 

13 actor, 

14 ComponentActorKilled, 

15) 

16from daiquiri.core.components.dcutilsmixin import DCUtilsMixin 

17from daiquiri.core.schema import ErrorSchema, MessageSchema, ValidationErrorSchema 

18from daiquiri.core.schema.samplescan import ScanConfigSchema, AvailableScansSchema 

19from daiquiri.core.schema.metadata import paginated 

20 

21import logging 

22 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class QueueScanResource(ComponentResource): 

28 @marshal( 

29 out=[ 

30 [200, MessageSchema(), "Scan successfully queued"], 

31 [400, ErrorSchema(), "Could not queue scan"], 

32 [404, ErrorSchema(), "No such data collection plan"], 

33 [422, ValidationErrorSchema(), "Data collection plan is not valid"], 

34 ] 

35 ) 

36 def post(self, datacollectionplanid): 

37 """Queue a scan from a datacollectionplan""" 

38 return self._parent.queue_scan(datacollectionplanid) 

39 

40 

41class AvailableScansResource(ComponentResource): 

42 @marshal(out=[[200, paginated(AvailableScansSchema), "A list of available scans"]]) 

43 def get(self, **kwargs): 

44 """Get a list of available scans""" 

45 return self._parent.get_available_scans() 

46 

47 

48class Samplescan(Component, DCUtilsMixin): 

49 _base_url = "samplescan" 

50 _config_schema = ScanConfigSchema() 

51 

52 def setup(self): 

53 self._scan_actors = [] 

54 self.register_route(AvailableScansResource, "") 

55 self.register_route(QueueScanResource, "/queue/<int:datacollectionplanid>") 

56 

57 self._generate_scan_actors() 

58 

59 def reload(self): 

60 self._generate_scan_actors() 

61 

62 def preprocess(self, **kwargs): 

63 sample = self._metadata.get_samples(kwargs["sampleid"]) 

64 if not sample: 

65 raise AttributeError(f"No such sample {kwargs['sampleid']}") 

66 kwargs["sampleid"] = sample["sampleid"] 

67 kwargs["sample"] = sample["name"] 

68 kwargs["extrametadata"] = sample["extrametadata"] 

69 kwargs["sessionid"] = g.blsession.get("sessionid") 

70 ( 

71 kwargs["containerqueuesampleid"], 

72 kwargs["datacollectionplanid"], 

73 ) = self._metadata.queue_sample( 

74 kwargs["sampleid"], 

75 kwargs.get("datacollectionplanid"), 

76 scanparameters=kwargs, 

77 ) 

78 kwargs["before_scan_starts"] = self.before_scan_starts 

79 kwargs["update_datacollection"] = self.update_datacollection 

80 kwargs["open_attachment"] = self._open_dc_attachment 

81 kwargs["add_scanqualityindicators"] = self.add_scanqualityindicators 

82 kwargs["enqueue"] = kwargs.get("enqueue", True) 

83 return kwargs 

84 

85 def _generate_scan_actors(self): 

86 """Dynamically generate scan actor resources""" 

87 

88 def post(self, **kwargs): 

89 pass 

90 

91 for scan in self._config["scans"]: 

92 name = scan["actor"] 

93 

94 # populate actors at config root 

95 if "actors" not in self._config: 

96 self._config["actors"] = {} 

97 self._config["actors"][name] = name 

98 if "actors_config" not in self._config: 

99 self._config["actors_config"] = {} 

100 self._config["actors_config"][name] = scan 

101 

102 if name in self._actors: 

103 continue 

104 

105 self._actors.append(name) 

106 self._scan_actors.append(name) 

107 fn = require_control(actor(name, preprocess=True)(post)) 

108 

109 if scan.get("require_staff"): 

110 fn = require_staff(fn) 

111 

112 act_res = type( 

113 name, 

114 (ComponentResource,), 

115 {"post": fn, "preprocess": self.preprocess}, 

116 ) 

117 self.register_actor_route(act_res, f"/{name}") 

118 

119 def get_available_scans(self): 

120 scans = [] 

121 for scan in self._config["scans"]: 

122 tags = scan.get("tags", []) 

123 if (not scan.get("require_staff")) or ( 

124 scan.get("require_staff") and g.user.staff() 

125 ): 

126 s = {"actor": scan["actor"], "tags": tags} 

127 scans.append(s) 

128 

129 return {"total": len(scans), "rows": scans} 

130 

131 def before_scan_starts(self, actor): 

132 """Saving directory is created""" 

133 if actor.get("datacollectionid"): 

134 self._save_dc_params(actor) 

135 

136 def actor_started(self, actid, actor): 

137 """Callback when an actor starts 

138 

139 For scan actors this will generate a datacollection 

140 """ 

141 if actor.name in self._scan_actors: 

142 if actor.metatype is not None: 

143 self.start_datacollection(actor) 

144 else: 

145 self.initialize_saving(actor) 

146 

147 logger.info(f"Actor '{actor.name}' with id '{actid}' started") 

148 

149 def actor_success(self, actid, response, actor): 

150 """Callback when an actor finishes successfully 

151 

152 For scan actors this update the datacollection with the endtime and 

153 'success' status 

154 """ 

155 if actor.name in self._scan_actors: 

156 if actor.get("datacollectionid"): 

157 self.update_datacollection( 

158 actor, endtime=datetime.now(), runstatus="Successful", emit_end=True 

159 ) 

160 self._save_dc_log(actor) 

161 else: 

162 self.actor_remove(actid, actor) 

163 

164 logger.info(f"Actor '{actor.name}' with id '{actid}' finished") 

165 

166 def actor_error(self, actid, exception, actor): 

167 """Callback when an actor fails 

168 

169 For scan actors this will update the datacollection with the end time and 

170 'failed' status 

171 """ 

172 status = "Aborted" if isinstance(exception, ComponentActorKilled) else "Failed" 

173 if actor.name in self._scan_actors: 

174 if actor.get("datacollectionid"): 

175 self.update_datacollection( 

176 actor, endtime=datetime.now(), runstatus=status 

177 ) 

178 if status == "Failed": 

179 self._save_dc_exception(actor, exception) 

180 else: 

181 self.actor_remove(actid, actor) 

182 if status == "Failed": 

183 logger.error(f"Actor '{actor.name}' with id '{actid}' failed") 

184 else: 

185 logger.info(f"Actor '{actor.name}' with id '{actid}' aborted") 

186 

187 def actor_remove(self, actid, actor): 

188 """Callback when an actor is removed from the queue 

189 

190 For scan actors this will remove the item from the database queue 

191 """ 

192 if actor.name in self._scan_actors: 

193 self._metadata.unqueue_sample( 

194 actor["sampleid"], 

195 containerqueuesampleid=actor["containerqueuesampleid"], 

196 no_context=True, 

197 ) 

198 

199 def queue_scan(self, datacollectionplanid): 

200 """Queue a data collection plan""" 

201 datacollectionplan = self._metadata.get_datacollectionplans( 

202 datacollectionplanid=datacollectionplanid 

203 ) 

204 if not datacollectionplan: 

205 return {"error": "Data collection plan not found"}, 404 

206 

207 if datacollectionplan["datacollectionid"]: 

208 return ( 

209 { 

210 "error": f"That data collection plan has already been executed `datacollectionid`: {datacollectionplan['datacollectionid']}" 

211 }, 

212 400, 

213 ) 

214 

215 params = { 

216 "sampleid": datacollectionplan["sampleid"], 

217 **datacollectionplan["scanparameters"], 

218 } 

219 

220 if "actor" not in params: 

221 return ( 

222 { 

223 "error": "No actor specified within the selected data collection plan" 

224 }, 

225 400, 

226 ) 

227 

228 actor = params.pop("actor") 

229 params.pop("component") 

230 

231 # TODO: This all needs factoring out of `actor_wrapper` to make 

232 # it reusable 

233 schema = self.actor_schema(actor) 

234 schema_inst = schema() 

235 try: 

236 schema_inst.load(params) 

237 except ValidationError as err: 

238 return {"messages": err.messages}, 422 

239 

240 params["datacollectionplanid"] = datacollectionplan["datacollectionplanid"] 

241 params = self.preprocess(**params) 

242 

243 actkw = { 

244 "success": self.actor_success, 

245 "start": self.actor_started, 

246 "error": self.actor_error, 

247 "remove": self.actor_remove, 

248 } 

249 uuid = self.actor(actor, actargs=params, **actkw) 

250 

251 if uuid: 

252 log.get("user").info(f"New actor created '{actor}'", type="actor") 

253 return {"message": f"Actor created with uuid {uuid}"} 

254 else: 

255 return {"error": "Could not create actor"}, 400