Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/metadata/mock.py: 0%

352 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1# -*- coding: utf-8 -*- 

2import logging 

3 

4from flask import g 

5 

6from daiquiri.core.metadata import MetaDataHandler 

7from daiquiri.core.metadata.user import User 

8from daiquiri.core.metadata.mockdata import ( 

9 USERS, 

10 PROPOSALS, 

11 SESSIONS, 

12 COMPONENTS, 

13 SAMPLES, 

14 SUBSAMPLES, 

15 SAMPLEIMAGES, 

16 SAMPLEACTIONS, 

17 DATACOLLECTIONS, 

18 DATACOLLECTIONATTACHMENTS, 

19 XRF_MAPS, 

20 XRF_MAP_ROIS, 

21 XRF_COMPOSITES, 

22) 

23from daiquiri.core.metadata.xrf import generate_histogram 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class MockMetaDataHandler(MetaDataHandler): 

29 _componentid = len(COMPONENTS) + 1 

30 _sampleid = len(SAMPLES) + 1 

31 _sampleimageid = len(SAMPLEIMAGES) + 1 

32 _subsampleid = len(SUBSAMPLES) + 1 

33 _datacollectionid = len(DATACOLLECTIONS) + 1 

34 _datacollectionattachmentid = len(DATACOLLECTIONATTACHMENTS) + 1 

35 _sampleactionid = len(SAMPLEACTIONS) + 1 

36 

37 _xrf_mapid = len(XRF_MAPS) + 1 

38 _xrf_map_roiid = len(XRF_MAP_ROIS) + 1 

39 _xrf_compositeid = len(XRF_COMPOSITES) + 1 

40 

41 def get_user(self, **kwargs): 

42 for u in USERS: 

43 if u["login"] == g.login: 

44 user = User(**u) 

45 user["is_staff"] = user.permission(self._config["meta_staff"]) 

46 

47 return user 

48 

49 def set_session(self, session): 

50 if self.verify_session(session): 

51 self._session.update({"blsession": session}) 

52 return True 

53 

54 def verify_session(self, session): 

55 for s in SESSIONS: 

56 if s["session"] == session: 

57 return s 

58 

59 return None 

60 

61 def get_proposals(self, proposal=None, **kwargs): 

62 if proposal: 

63 for p in PROPOSALS: 

64 if p["proposal"] == proposal: 

65 return p 

66 

67 return None 

68 

69 return {"total": len(PROPOSALS), "rows": PROPOSALS} 

70 

71 def get_sessions(self, session=None, **kwargs): 

72 if session: 

73 for s in SESSIONS: 

74 if s["session"] == session: 

75 return s 

76 

77 return None 

78 

79 if kwargs.get("sessionid"): 

80 for s in SESSIONS: 

81 if s["sessionid"] == kwargs["sessionid"]: 

82 return s 

83 

84 return None 

85 

86 return {"total": len(SESSIONS), "rows": SESSIONS} 

87 

88 def get_components(self, componentid=None, **kwargs): 

89 if componentid: 

90 for c in COMPONENTS: 

91 if c["componentid"] == componentid: 

92 return c 

93 

94 return None 

95 

96 return {"total": len(COMPONENTS), "rows": COMPONENTS} 

97 

98 def add_component(self, **kwargs): 

99 kwargs["componentid"] = self._componentid 

100 self._componentid += 1 

101 

102 COMPONENTS.append(kwargs) 

103 return kwargs 

104 

105 def update_component(self, componentid, **kwargs): 

106 print("update_component", componentid, kwargs) 

107 componentid = self.get_components(componentid) 

108 

109 if componentid: 

110 for k, v in kwargs.items(): 

111 componentid[k] = v 

112 

113 return componentid 

114 

115 def get_samples(self, sampleid=None, **kwargs): 

116 if sampleid: 

117 for s in SAMPLES: 

118 if s["sampleid"] == sampleid: 

119 return s 

120 

121 return None 

122 

123 return {"total": len(SAMPLES), "rows": SAMPLES} 

124 

125 def add_sample(self, **kwargs): 

126 kwargs["sampleid"] = self._sampleid 

127 kwargs["datacollections"] = 0 

128 kwargs["subsamples"] = 0 

129 self._sampleid += 1 

130 

131 SAMPLES.append(kwargs) 

132 return kwargs 

133 

134 def remove_sample(self, sampleid): 

135 sample = self.get_samples(sampleid) 

136 if sample: 

137 SAMPLES.remove(sample) 

138 return True 

139 

140 def update_sample(self, sampleid, **kwargs): 

141 print("update_sample", sampleid, kwargs) 

142 sample = self.get_samples(sampleid) 

143 

144 if sample: 

145 for k, v in kwargs.items(): 

146 sample[k] = v 

147 

148 return sample 

149 

150 def queue_sample(self, sampleid, **kwargs): 

151 queue = 1 

152 if "unqueue" in kwargs: 

153 queue = 0 

154 

155 sample = self.update_sample(sampleid, queued=queue) 

156 if sample: 

157 return 1, 1 

158 

159 def unqueue_sample(self, sampleid, **kwargs): 

160 self.queue_sample(sampleid, unqueue=True) 

161 

162 def get_subsamples(self, subsampleid=None, **kwargs): 

163 if subsampleid: 

164 for s in SUBSAMPLES: 

165 if s["subsampleid"] == subsampleid: 

166 return s 

167 

168 return None 

169 

170 if "sampleid" in kwargs: 

171 return [s for s in SUBSAMPLES if s["sampleid"] == kwargs["sampleid"]] 

172 

173 return SUBSAMPLES 

174 

175 def add_subsample(self, **kwargs): 

176 kwargs["subsampleid"] = self._subsampleid 

177 kwargs["datacollections"] = 0 

178 self._subsampleid += 1 

179 

180 sample = self.get_samples(kwargs["sampleid"]) 

181 sample["subsamples"] += 1 

182 

183 SUBSAMPLES.append(kwargs) 

184 return kwargs 

185 

186 def update_subsample(self, subsampleid, **kwargs): 

187 print("update_subsample", subsampleid, kwargs) 

188 subsample = self.get_subsamples(subsampleid) 

189 

190 # if subsample["datacollections"] > 0: 

191 # return 

192 

193 if subsample: 

194 for k, v in kwargs.items(): 

195 subsample[k] = v 

196 

197 return subsample 

198 

199 def remove_subsample(self, subsampleid): 

200 subsample = self.get_subsamples(subsampleid) 

201 

202 if subsample: 

203 SUBSAMPLES.remove(subsample) 

204 return True 

205 

206 def queue_subsample(self, subsampleid, **kwargs): 

207 queue = 1 

208 if "unqueue" in kwargs: 

209 queue = 0 

210 

211 subsample = self.update_subsample(subsampleid, queued=queue) 

212 if subsample: 

213 print("queue_subsample after", subsample) 

214 return 1, 1 

215 

216 def unqueue_subsample(self, subsampleid, **kwargs): 

217 self.queue_subsample(subsampleid, unqueue=True) 

218 

219 def get_sampleimages(self, sampleimageid=None, **kwargs): 

220 for i in SAMPLEIMAGES: 

221 i["url"] = f"/{self._base_url}/samples/images/{i['sampleimageid']}" 

222 

223 if sampleimageid: 

224 for s in SAMPLEIMAGES: 

225 if s["sampleimageid"] == sampleimageid: 

226 return s 

227 

228 return None 

229 

230 if "sampleid" in kwargs: 

231 return [s for s in SAMPLEIMAGES if s["sampleid"] == kwargs["sampleid"]] 

232 

233 return SAMPLEIMAGES 

234 

235 def add_sampleimage(self, **kwargs): 

236 kwargs["sampleimageid"] = self._sampleimageid 

237 self._sampleimageid += 1 

238 

239 SAMPLEIMAGES.append(kwargs) 

240 return self.get_sampleimages(sampleimageid=kwargs["sampleimageid"]) 

241 

242 def get_sampleactions(self, sampleactionid=None, **kwargs): 

243 if sampleactionid: 

244 for a in SAMPLEACTIONS: 

245 if a["sampleactionid"] == sampleactionid: 

246 return a 

247 

248 return None 

249 

250 if "sampleid" in kwargs: 

251 return [a for a in SAMPLEACTIONS if a["sampleid"] == kwargs["sampleid"]] 

252 

253 return SAMPLEACTIONS 

254 

255 def add_sampleaction(self, **kwargs): 

256 kwargs["sampleactionid"] = self._sampleactionid 

257 self._sampleactionid += 1 

258 

259 SAMPLEACTIONS.append(kwargs) 

260 return kwargs 

261 

262 def update_sampleaction(self, sampleactionid, **kwargs): 

263 print("update_sampleaction", sampleactionid, kwargs) 

264 sampleaction = self.get_sampleactions(sampleactionid) 

265 

266 if sampleaction: 

267 for k, v in kwargs.items(): 

268 sampleaction[k] = v 

269 

270 return sampleaction 

271 

272 def get_datacollections(self, datacollectionid=None, **kwargs): 

273 print("get_datacollections", datacollectionid, kwargs) 

274 datacollections_copy = DATACOLLECTIONS.copy() 

275 for d in datacollections_copy: 

276 d = self._check_snapshots(d) 

277 

278 if datacollectionid: 

279 for i, d in enumerate(datacollections_copy): 

280 if d["datacollectionid"] == datacollectionid: 

281 return DATACOLLECTIONS[i] if kwargs.get("original") else d 

282 

283 return None 

284 

285 if "sampleid" in kwargs: 

286 dcs = [ 

287 d for d in datacollections_copy if d["sampleid"] == kwargs["sampleid"] 

288 ] 

289 return {"total": len(dcs), "rows": dcs} 

290 

291 if "subsampleid" in kwargs: 

292 dcs = [ 

293 d 

294 for d in datacollections_copy 

295 if d["subsampleid"] == kwargs["subsampleid"] 

296 ] 

297 return {"total": len(dcs), "rows": dcs} 

298 

299 dcs = [d for d in datacollections_copy] 

300 return {"total": len(dcs), "rows": dcs} 

301 

302 def add_datacollection(self, **kwargs): 

303 kwargs["datacollectionid"] = self._datacollectionid 

304 kwargs["datacollectiongroupid"] = self._datacollectionid 

305 self._datacollectionid += 1 

306 

307 if "subsampleid" in kwargs: 

308 subsample = self.get_subsamples(kwargs["subsampleid"]) 

309 subsample["datacollections"] += 1 

310 

311 if "sampleid" in kwargs: 

312 sample = self.get_samples(kwargs["sampleid"]) 

313 sample["datacollections"] += 1 

314 

315 DATACOLLECTIONS.append(kwargs) 

316 

317 print("add datacollection", kwargs) 

318 return kwargs.copy() 

319 

320 def update_datacollection(self, datacollectionid=None, **kwargs): 

321 dc = self.get_datacollections(datacollectionid, original=True) 

322 if dc: 

323 for kw, val in kwargs.items(): 

324 if kw == "runstatus" and dc.get("subsampleid"): 

325 self.unqueue_subsample(dc["subsampleid"]) 

326 

327 dc[kw] = val 

328 

329 print("update_datacollection after", dc) 

330 return dc.copy() 

331 

332 def get_datacollection_attachments( 

333 self, datacollectionfileattachmentid=None, **kwargs 

334 ): 

335 if datacollectionfileattachmentid: 

336 for a in DATACOLLECTIONATTACHMENTS: 

337 if ( 

338 a["datacollectionfileattachmentid"] 

339 == datacollectionfileattachmentid 

340 ): 

341 return a 

342 

343 return None 

344 

345 if "datacollectionid" in kwargs: 

346 return [ 

347 a 

348 for a in DATACOLLECTIONATTACHMENTS 

349 if a["datacollectionid"] == kwargs["datacollectionid"] 

350 ] 

351 

352 return DATACOLLECTIONATTACHMENTS 

353 

354 def add_datacollection_attachment(self, **kwargs): 

355 kwargs["datacollectionfileattachmentid"] = self._datacollectionattachmentid 

356 self._datacollectionattachmentid += 1 

357 

358 DATACOLLECTIONATTACHMENTS.append(kwargs) 

359 return self.get_datacollection_attachments( 

360 datacollectionattachmentid=kwargs["datacollectionfileattachmentid"] 

361 ) 

362 

363 def add_datacollectionplan(self, **kwargs): 

364 pass 

365 

366 def get_xrf_maps(self, mapid=None, **kwargs): 

367 xrf_maps_copy = XRF_MAPS.copy() 

368 if mapid is not None: 

369 for m in xrf_maps_copy: 

370 if m["mapid"] == mapid: 

371 m["histogram"] = generate_histogram(m["data"]) 

372 return m 

373 

374 return None 

375 

376 if kwargs.get("sampleid"): 

377 return [m for m in XRF_MAPS if m["sampleid"] == kwargs.get("sampleid")] 

378 

379 if kwargs.get("subsampleid"): 

380 return [ 

381 m for m in XRF_MAPS if m["subsampleid"] == kwargs.get("subsampleid") 

382 ] 

383 

384 for m in xrf_maps_copy: 

385 m["histogram"] = generate_histogram(m["data"]) 

386 

387 return xrf_maps_copy 

388 

389 def add_xrf_map(self, **kwargs): 

390 print("add map", kwargs) 

391 

392 dc = self.get_datacollections(datacollectionid=kwargs["datacollectionid"]) 

393 roi = self.get_xrf_map_rois(maproiid=kwargs["maproiid"]) 

394 

395 if roi.get("scalar"): 

396 kwargs["scalar"] = roi["scalar"] 

397 else: 

398 kwargs["element"] = roi["element"] 

399 kwargs["edge"] = roi["edge"] 

400 

401 for k in ["sampleid", "subsampleid"]: 

402 kwargs[k] = dc[k] 

403 

404 kwargs["w"] = dc["steps_x"] 

405 kwargs["h"] = dc["steps_y"] 

406 

407 kwargs["snaked"] = False 

408 kwargs["opacity"] = 1 

409 

410 kwargs["mapid"] = self._xrf_mapid 

411 self._xrf_mapid += 1 

412 

413 XRF_MAPS.append(kwargs) 

414 

415 return kwargs 

416 

417 def update_xrf_map(self, mapid, **kwargs): 

418 m = self.get_xrf_maps(mapid=mapid) 

419 if m: 

420 print("update map") 

421 for k, v in kwargs.items(): 

422 m[k] = v 

423 

424 return m 

425 

426 def remove_xrf_map(self, mapid): 

427 m = self.get_xrf_maps(mapid=mapid) 

428 if m: 

429 XRF_MAPS.remove(m) 

430 return True 

431 

432 def get_xrf_map_rois(self, maproiid=None, **kwargs): 

433 if maproiid is not None: 

434 for m in XRF_MAP_ROIS: 

435 if m["maproiid"] == maproiid: 

436 return m 

437 

438 return None 

439 

440 return XRF_MAP_ROIS 

441 

442 def add_xrf_map_roi(self, **kwargs): 

443 kwargs["maproiid"] = self._xrf_map_roiid 

444 self._xrf_map_roiid += 1 

445 

446 kwargs["maps"] = 0 

447 

448 XRF_MAP_ROIS.append(kwargs) 

449 

450 return kwargs 

451 

452 def add_xrf_map_roi_scalar(self, **kwargs): 

453 kwargs["maproiid"] = self._xrf_map_roiid 

454 self._xrf_map_roiid += 1 

455 

456 kwargs["maps"] = 0 

457 kwargs["startenergy"] = 0 

458 kwargs["endenergy"] = 0 

459 

460 XRF_MAP_ROIS.append(kwargs) 

461 

462 return kwargs 

463 

464 def update_xrf_map_roi(self, maproiid, **kwargs): 

465 roi = self.get_xrf_map_rois(maproiid=maproiid) 

466 if roi: 

467 for k, v in kwargs.items(): 

468 roi[k] = v 

469 

470 return roi 

471 

472 def remove_xrf_map_roi(self, maproiid, **kwargs): 

473 roi = self.get_xrf_map_rois(maproiid=maproiid) 

474 if roi: 

475 XRF_MAP_ROIS.remove(roi) 

476 return True 

477 

478 def get_xrf_composites(self, compositeid=None, sampleid=None): 

479 if compositeid is not None: 

480 for c in XRF_COMPOSITES: 

481 if c["compositeid"] == compositeid: 

482 return c 

483 

484 return None 

485 

486 if sampleid is not None: 

487 return [m for m in XRF_COMPOSITES if m["sampleid"] == sampleid] 

488 

489 return XRF_COMPOSITES 

490 

491 def add_xrf_composite(self, **kwargs): 

492 subsample = self.get_subsamples(subsampleid=kwargs["subsampleid"]) 

493 

494 if not subsample: 

495 return 

496 

497 exists = [] 

498 for mid in ["r", "g", "b"]: 

499 for m in XRF_MAPS: 

500 if kwargs[mid] == m["mapid"]: 

501 exists.append(True) 

502 

503 if len(exists) != 3: 

504 return 

505 

506 kwargs["sampleid"] = subsample["sampleid"] 

507 kwargs["opacity"] = 1 

508 

509 kwargs["compositeid"] = self._xrf_compositeid 

510 self._xrf_compositeid += 1 

511 

512 XRF_COMPOSITES.append(kwargs) 

513 

514 return kwargs 

515 

516 def update_xrf_composite(self, compositeid, **kwargs): 

517 composite = self.get_xrf_composites(compositeid=compositeid) 

518 if composite: 

519 for k, v in kwargs.items(): 

520 composite[k] = v 

521 

522 return composite 

523 

524 def remove_xrf_composite(self, compositeid, **kwargs): 

525 comp = self.get_xrf_composites(compositeid=compositeid) 

526 if comp: 

527 XRF_COMPOSITES.remove(comp) 

528 return True