Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/metadata/mock.py: 0%
352 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1# -*- coding: utf-8 -*-
2import logging
4from flask import g
6from daiquiri.core.metadata import MetaDataHandler
7from daiquiri.core.metadata.user import User
8from daiquiri.core.metadata.mockdata import (
9 USERS,
10 PROPOSALS,
11 SESSIONS,
12 COMPONENTS,
13 SAMPLES,
14 SUBSAMPLES,
15 SAMPLEIMAGES,
16 SAMPLEACTIONS,
17 DATACOLLECTIONS,
18 DATACOLLECTIONATTACHMENTS,
19 XRF_MAPS,
20 XRF_MAP_ROIS,
21 XRF_COMPOSITES,
22)
23from daiquiri.core.metadata.xrf import generate_histogram
25logger = logging.getLogger(__name__)
28class MockMetaDataHandler(MetaDataHandler):
29 _componentid = len(COMPONENTS) + 1
30 _sampleid = len(SAMPLES) + 1
31 _sampleimageid = len(SAMPLEIMAGES) + 1
32 _subsampleid = len(SUBSAMPLES) + 1
33 _datacollectionid = len(DATACOLLECTIONS) + 1
34 _datacollectionattachmentid = len(DATACOLLECTIONATTACHMENTS) + 1
35 _sampleactionid = len(SAMPLEACTIONS) + 1
37 _xrf_mapid = len(XRF_MAPS) + 1
38 _xrf_map_roiid = len(XRF_MAP_ROIS) + 1
39 _xrf_compositeid = len(XRF_COMPOSITES) + 1
41 def get_user(self, **kwargs):
42 for u in USERS:
43 if u["login"] == g.login:
44 user = User(**u)
45 user["is_staff"] = user.permission(self._config["meta_staff"])
47 return user
49 def set_session(self, session):
50 if self.verify_session(session):
51 self._session.update({"blsession": session})
52 return True
54 def verify_session(self, session):
55 for s in SESSIONS:
56 if s["session"] == session:
57 return s
59 return None
61 def get_proposals(self, proposal=None, **kwargs):
62 if proposal:
63 for p in PROPOSALS:
64 if p["proposal"] == proposal:
65 return p
67 return None
69 return {"total": len(PROPOSALS), "rows": PROPOSALS}
71 def get_sessions(self, session=None, **kwargs):
72 if session:
73 for s in SESSIONS:
74 if s["session"] == session:
75 return s
77 return None
79 if kwargs.get("sessionid"):
80 for s in SESSIONS:
81 if s["sessionid"] == kwargs["sessionid"]:
82 return s
84 return None
86 return {"total": len(SESSIONS), "rows": SESSIONS}
88 def get_components(self, componentid=None, **kwargs):
89 if componentid:
90 for c in COMPONENTS:
91 if c["componentid"] == componentid:
92 return c
94 return None
96 return {"total": len(COMPONENTS), "rows": COMPONENTS}
98 def add_component(self, **kwargs):
99 kwargs["componentid"] = self._componentid
100 self._componentid += 1
102 COMPONENTS.append(kwargs)
103 return kwargs
105 def update_component(self, componentid, **kwargs):
106 print("update_component", componentid, kwargs)
107 componentid = self.get_components(componentid)
109 if componentid:
110 for k, v in kwargs.items():
111 componentid[k] = v
113 return componentid
115 def get_samples(self, sampleid=None, **kwargs):
116 if sampleid:
117 for s in SAMPLES:
118 if s["sampleid"] == sampleid:
119 return s
121 return None
123 return {"total": len(SAMPLES), "rows": SAMPLES}
125 def add_sample(self, **kwargs):
126 kwargs["sampleid"] = self._sampleid
127 kwargs["datacollections"] = 0
128 kwargs["subsamples"] = 0
129 self._sampleid += 1
131 SAMPLES.append(kwargs)
132 return kwargs
134 def remove_sample(self, sampleid):
135 sample = self.get_samples(sampleid)
136 if sample:
137 SAMPLES.remove(sample)
138 return True
140 def update_sample(self, sampleid, **kwargs):
141 print("update_sample", sampleid, kwargs)
142 sample = self.get_samples(sampleid)
144 if sample:
145 for k, v in kwargs.items():
146 sample[k] = v
148 return sample
150 def queue_sample(self, sampleid, **kwargs):
151 queue = 1
152 if "unqueue" in kwargs:
153 queue = 0
155 sample = self.update_sample(sampleid, queued=queue)
156 if sample:
157 return 1, 1
159 def unqueue_sample(self, sampleid, **kwargs):
160 self.queue_sample(sampleid, unqueue=True)
162 def get_subsamples(self, subsampleid=None, **kwargs):
163 if subsampleid:
164 for s in SUBSAMPLES:
165 if s["subsampleid"] == subsampleid:
166 return s
168 return None
170 if "sampleid" in kwargs:
171 return [s for s in SUBSAMPLES if s["sampleid"] == kwargs["sampleid"]]
173 return SUBSAMPLES
175 def add_subsample(self, **kwargs):
176 kwargs["subsampleid"] = self._subsampleid
177 kwargs["datacollections"] = 0
178 self._subsampleid += 1
180 sample = self.get_samples(kwargs["sampleid"])
181 sample["subsamples"] += 1
183 SUBSAMPLES.append(kwargs)
184 return kwargs
186 def update_subsample(self, subsampleid, **kwargs):
187 print("update_subsample", subsampleid, kwargs)
188 subsample = self.get_subsamples(subsampleid)
190 # if subsample["datacollections"] > 0:
191 # return
193 if subsample:
194 for k, v in kwargs.items():
195 subsample[k] = v
197 return subsample
199 def remove_subsample(self, subsampleid):
200 subsample = self.get_subsamples(subsampleid)
202 if subsample:
203 SUBSAMPLES.remove(subsample)
204 return True
206 def queue_subsample(self, subsampleid, **kwargs):
207 queue = 1
208 if "unqueue" in kwargs:
209 queue = 0
211 subsample = self.update_subsample(subsampleid, queued=queue)
212 if subsample:
213 print("queue_subsample after", subsample)
214 return 1, 1
216 def unqueue_subsample(self, subsampleid, **kwargs):
217 self.queue_subsample(subsampleid, unqueue=True)
219 def get_sampleimages(self, sampleimageid=None, **kwargs):
220 for i in SAMPLEIMAGES:
221 i["url"] = f"/{self._base_url}/samples/images/{i['sampleimageid']}"
223 if sampleimageid:
224 for s in SAMPLEIMAGES:
225 if s["sampleimageid"] == sampleimageid:
226 return s
228 return None
230 if "sampleid" in kwargs:
231 return [s for s in SAMPLEIMAGES if s["sampleid"] == kwargs["sampleid"]]
233 return SAMPLEIMAGES
235 def add_sampleimage(self, **kwargs):
236 kwargs["sampleimageid"] = self._sampleimageid
237 self._sampleimageid += 1
239 SAMPLEIMAGES.append(kwargs)
240 return self.get_sampleimages(sampleimageid=kwargs["sampleimageid"])
242 def get_sampleactions(self, sampleactionid=None, **kwargs):
243 if sampleactionid:
244 for a in SAMPLEACTIONS:
245 if a["sampleactionid"] == sampleactionid:
246 return a
248 return None
250 if "sampleid" in kwargs:
251 return [a for a in SAMPLEACTIONS if a["sampleid"] == kwargs["sampleid"]]
253 return SAMPLEACTIONS
255 def add_sampleaction(self, **kwargs):
256 kwargs["sampleactionid"] = self._sampleactionid
257 self._sampleactionid += 1
259 SAMPLEACTIONS.append(kwargs)
260 return kwargs
262 def update_sampleaction(self, sampleactionid, **kwargs):
263 print("update_sampleaction", sampleactionid, kwargs)
264 sampleaction = self.get_sampleactions(sampleactionid)
266 if sampleaction:
267 for k, v in kwargs.items():
268 sampleaction[k] = v
270 return sampleaction
272 def get_datacollections(self, datacollectionid=None, **kwargs):
273 print("get_datacollections", datacollectionid, kwargs)
274 datacollections_copy = DATACOLLECTIONS.copy()
275 for d in datacollections_copy:
276 d = self._check_snapshots(d)
278 if datacollectionid:
279 for i, d in enumerate(datacollections_copy):
280 if d["datacollectionid"] == datacollectionid:
281 return DATACOLLECTIONS[i] if kwargs.get("original") else d
283 return None
285 if "sampleid" in kwargs:
286 dcs = [
287 d for d in datacollections_copy if d["sampleid"] == kwargs["sampleid"]
288 ]
289 return {"total": len(dcs), "rows": dcs}
291 if "subsampleid" in kwargs:
292 dcs = [
293 d
294 for d in datacollections_copy
295 if d["subsampleid"] == kwargs["subsampleid"]
296 ]
297 return {"total": len(dcs), "rows": dcs}
299 dcs = [d for d in datacollections_copy]
300 return {"total": len(dcs), "rows": dcs}
302 def add_datacollection(self, **kwargs):
303 kwargs["datacollectionid"] = self._datacollectionid
304 kwargs["datacollectiongroupid"] = self._datacollectionid
305 self._datacollectionid += 1
307 if "subsampleid" in kwargs:
308 subsample = self.get_subsamples(kwargs["subsampleid"])
309 subsample["datacollections"] += 1
311 if "sampleid" in kwargs:
312 sample = self.get_samples(kwargs["sampleid"])
313 sample["datacollections"] += 1
315 DATACOLLECTIONS.append(kwargs)
317 print("add datacollection", kwargs)
318 return kwargs.copy()
320 def update_datacollection(self, datacollectionid=None, **kwargs):
321 dc = self.get_datacollections(datacollectionid, original=True)
322 if dc:
323 for kw, val in kwargs.items():
324 if kw == "runstatus" and dc.get("subsampleid"):
325 self.unqueue_subsample(dc["subsampleid"])
327 dc[kw] = val
329 print("update_datacollection after", dc)
330 return dc.copy()
332 def get_datacollection_attachments(
333 self, datacollectionfileattachmentid=None, **kwargs
334 ):
335 if datacollectionfileattachmentid:
336 for a in DATACOLLECTIONATTACHMENTS:
337 if (
338 a["datacollectionfileattachmentid"]
339 == datacollectionfileattachmentid
340 ):
341 return a
343 return None
345 if "datacollectionid" in kwargs:
346 return [
347 a
348 for a in DATACOLLECTIONATTACHMENTS
349 if a["datacollectionid"] == kwargs["datacollectionid"]
350 ]
352 return DATACOLLECTIONATTACHMENTS
354 def add_datacollection_attachment(self, **kwargs):
355 kwargs["datacollectionfileattachmentid"] = self._datacollectionattachmentid
356 self._datacollectionattachmentid += 1
358 DATACOLLECTIONATTACHMENTS.append(kwargs)
359 return self.get_datacollection_attachments(
360 datacollectionattachmentid=kwargs["datacollectionfileattachmentid"]
361 )
363 def add_datacollectionplan(self, **kwargs):
364 pass
366 def get_xrf_maps(self, mapid=None, **kwargs):
367 xrf_maps_copy = XRF_MAPS.copy()
368 if mapid is not None:
369 for m in xrf_maps_copy:
370 if m["mapid"] == mapid:
371 m["histogram"] = generate_histogram(m["data"])
372 return m
374 return None
376 if kwargs.get("sampleid"):
377 return [m for m in XRF_MAPS if m["sampleid"] == kwargs.get("sampleid")]
379 if kwargs.get("subsampleid"):
380 return [
381 m for m in XRF_MAPS if m["subsampleid"] == kwargs.get("subsampleid")
382 ]
384 for m in xrf_maps_copy:
385 m["histogram"] = generate_histogram(m["data"])
387 return xrf_maps_copy
389 def add_xrf_map(self, **kwargs):
390 print("add map", kwargs)
392 dc = self.get_datacollections(datacollectionid=kwargs["datacollectionid"])
393 roi = self.get_xrf_map_rois(maproiid=kwargs["maproiid"])
395 if roi.get("scalar"):
396 kwargs["scalar"] = roi["scalar"]
397 else:
398 kwargs["element"] = roi["element"]
399 kwargs["edge"] = roi["edge"]
401 for k in ["sampleid", "subsampleid"]:
402 kwargs[k] = dc[k]
404 kwargs["w"] = dc["steps_x"]
405 kwargs["h"] = dc["steps_y"]
407 kwargs["snaked"] = False
408 kwargs["opacity"] = 1
410 kwargs["mapid"] = self._xrf_mapid
411 self._xrf_mapid += 1
413 XRF_MAPS.append(kwargs)
415 return kwargs
417 def update_xrf_map(self, mapid, **kwargs):
418 m = self.get_xrf_maps(mapid=mapid)
419 if m:
420 print("update map")
421 for k, v in kwargs.items():
422 m[k] = v
424 return m
426 def remove_xrf_map(self, mapid):
427 m = self.get_xrf_maps(mapid=mapid)
428 if m:
429 XRF_MAPS.remove(m)
430 return True
432 def get_xrf_map_rois(self, maproiid=None, **kwargs):
433 if maproiid is not None:
434 for m in XRF_MAP_ROIS:
435 if m["maproiid"] == maproiid:
436 return m
438 return None
440 return XRF_MAP_ROIS
442 def add_xrf_map_roi(self, **kwargs):
443 kwargs["maproiid"] = self._xrf_map_roiid
444 self._xrf_map_roiid += 1
446 kwargs["maps"] = 0
448 XRF_MAP_ROIS.append(kwargs)
450 return kwargs
452 def add_xrf_map_roi_scalar(self, **kwargs):
453 kwargs["maproiid"] = self._xrf_map_roiid
454 self._xrf_map_roiid += 1
456 kwargs["maps"] = 0
457 kwargs["startenergy"] = 0
458 kwargs["endenergy"] = 0
460 XRF_MAP_ROIS.append(kwargs)
462 return kwargs
464 def update_xrf_map_roi(self, maproiid, **kwargs):
465 roi = self.get_xrf_map_rois(maproiid=maproiid)
466 if roi:
467 for k, v in kwargs.items():
468 roi[k] = v
470 return roi
472 def remove_xrf_map_roi(self, maproiid, **kwargs):
473 roi = self.get_xrf_map_rois(maproiid=maproiid)
474 if roi:
475 XRF_MAP_ROIS.remove(roi)
476 return True
478 def get_xrf_composites(self, compositeid=None, sampleid=None):
479 if compositeid is not None:
480 for c in XRF_COMPOSITES:
481 if c["compositeid"] == compositeid:
482 return c
484 return None
486 if sampleid is not None:
487 return [m for m in XRF_COMPOSITES if m["sampleid"] == sampleid]
489 return XRF_COMPOSITES
491 def add_xrf_composite(self, **kwargs):
492 subsample = self.get_subsamples(subsampleid=kwargs["subsampleid"])
494 if not subsample:
495 return
497 exists = []
498 for mid in ["r", "g", "b"]:
499 for m in XRF_MAPS:
500 if kwargs[mid] == m["mapid"]:
501 exists.append(True)
503 if len(exists) != 3:
504 return
506 kwargs["sampleid"] = subsample["sampleid"]
507 kwargs["opacity"] = 1
509 kwargs["compositeid"] = self._xrf_compositeid
510 self._xrf_compositeid += 1
512 XRF_COMPOSITES.append(kwargs)
514 return kwargs
516 def update_xrf_composite(self, compositeid, **kwargs):
517 composite = self.get_xrf_composites(compositeid=compositeid)
518 if composite:
519 for k, v in kwargs.items():
520 composite[k] = v
522 return composite
524 def remove_xrf_composite(self, compositeid, **kwargs):
525 comp = self.get_xrf_composites(compositeid=compositeid)
526 if comp:
527 XRF_COMPOSITES.remove(comp)
528 return True