Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/biosaxscollect/__init__.py: 0%
411 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import copy
4import os
5import sys
7from astropy.io import ascii
8from astropy.table import Table
10from pymemcache.client.base import Client
12from marshmallow import fields
13from daiquiri.core import marshal, require_control
14from daiquiri.core.components import Component, ComponentResource, actor
15from daiquiri.core.schema.biosaxscollect.biosaxs import (
16 SamplesDoneSchema,
17 DataTestArgumentSchema,
18 ProcessingParametersSchema,
19 SCCollectSample,
20 SCCollectBuffer,
21 HPLCCollectSample,
22 ScanCollectArgumentSchema,
23 SingleCollectArgumentSchema,
24)
26from daiquiri.core.hardware.bliss.session import single_collect, scan_collect
28from daiquiri.core.logging import log
30from bliss import current_session
31from bliss.common import plot
32import json
34import logging
36logger = logging.getLogger(__name__)
39def mix_samples_and_buffers(samples, buffers):
40 mix = []
41 previous_after_buffer_name = None
42 for sample in samples:
43 _buffer = next((b for b in buffers if b.name == sample.buffer_name), None)
45 sample.file_path = "sample_" + sample.name
46 sample.do_collect = True
47 sample.experiment_type = "sc"
49 if sample.buffer_mode in ["Before", "Before & After"]:
50 # Only collect the buffer if its different from the
51 # last buffer collected. Simply dont collect the same
52 # buffer twice in a row
53 if previous_after_buffer_name != _buffer.name:
54 temp_buffer = copy.deepcopy(_buffer)
55 temp_buffer.id = (
56 "bid-" + temp_buffer.id + "-sid-" + sample.id + "-before"
57 )
58 temp_buffer.file_path = "buffer_before_" + sample.name
59 temp_buffer.do_collect = True
60 mix.append(temp_buffer)
61 else:
62 # This buffer is for Substract not to be collect twice
63 temp_buffer = copy.deepcopy(_buffer)
64 if len(samples) > samples.index(sample) + 1:
65 next_sample = samples[samples.index(sample) + 1]
66 elif len(samples) == samples.index(sample) + 1:
67 next_sample = samples[samples.index(sample)]
69 temp_buffer.id = (
70 "bid-" + temp_buffer.id + "-sid-" + next_sample.id + "-before"
71 )
72 temp_buffer.file_path = "buffer_before_" + next_sample.name
73 temp_buffer.do_collect = False
74 mix.append(temp_buffer)
76 previous_after_buffer_name = None
78 mix.append(sample)
80 if sample.buffer_mode in ["After", "Before & After"]:
81 temp_buffer = copy.deepcopy(_buffer)
82 temp_buffer.id = "bid-" + temp_buffer.id + "-sid-" + sample.id + "-after"
83 temp_buffer.file_path = "buffer_after_" + sample.name
84 temp_buffer.do_collect = True
85 mix.append(temp_buffer)
86 previous_after_buffer_name = _buffer.name
88 if (
89 sample.buffer_mode in ["Before & After"]
90 and len(samples) < samples.index(sample) + 1
91 ):
92 next_sample = samples[
93 samples.index(sample) + 1
94 ] # second conditions is to avoid when there is no next sample
95 mix[-1].file_path = (
96 "buffer_after_" + sample.name + "_and_buffer_before_" + next_sample.name
97 )
99 return mix
102def add_params_to_hplc_sample(samples):
103 to_be_collect = []
104 for sample in samples:
105 sample.do_collect = True
106 sample.experiment_type = "hplc"
107 to_be_collect.append(sample)
108 return to_be_collect
111def add_params_to_hplc_equilibrate(column):
112 column.do_collect = True
113 column.experiment_type = "equilibrate"
114 return column
117class AddSCCollectResource(ComponentResource):
118 @require_control
119 @actor("sccollect", enqueue=True, preprocess=True)
120 def post(self, *args, **kwargs):
121 return kwargs
123 def preprocess(self, *args, **kwargs):
124 kwargs["collect_parameters"] = mix_samples_and_buffers(
125 kwargs["samples"], kwargs["buffers"]
126 )
127 kwargs["collect_started_cb"] = self._parent.collect_started_cb
128 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
129 kwargs["update_sample_scan_done_cb"] = self._parent.update_sample_scan_done_cb
130 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
131 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
132 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
134 return kwargs
137class RunSCCollectResource(ComponentResource):
138 @require_control
139 @actor("sccollect", enqueue=False, preprocess=True)
140 def post(self, *args, **kwargs):
141 return kwargs
143 def preprocess(self, *args, **kwargs):
144 kwargs["collect_parameters"] = mix_samples_and_buffers(
145 kwargs["samples"], kwargs["buffers"]
146 )
147 kwargs["collect_started_cb"] = self._parent.collect_started_cb
148 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
149 kwargs["update_sample_scan_done_cb"] = self._parent.update_sample_scan_done_cb
150 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
151 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
152 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
154 return kwargs
157class AddHPLCCollectResource(ComponentResource):
158 @require_control
159 @actor("hplccollect", enqueue=True, preprocess=True)
160 def post(self, *args, **kwargs):
161 return kwargs
163 def preprocess(self, *args, **kwargs):
164 kwargs["collect_parameters"] = add_params_to_hplc_sample(kwargs["samples"])
165 kwargs["collect_started_cb"] = self._parent.collect_started_cb
166 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
167 kwargs["update_sample_scan_done_cb"] = self._parent.update_sample_scan_done_cb
168 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
169 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
170 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
172 return kwargs
175class RunHPLCCollectResource(ComponentResource):
176 @require_control
177 @actor("hplccollect", enqueue=False, preprocess=True)
178 def post(self, *args, **kwargs):
179 return kwargs
181 def preprocess(self, *args, **kwargs):
182 kwargs["collect_parameters"] = add_params_to_hplc_sample(kwargs["samples"])
183 kwargs["collect_started_cb"] = self._parent.collect_started_cb
184 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
185 kwargs["update_sample_scan_done_cb"] = self._parent.update_sample_scan_done_cb
186 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
187 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
188 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
190 return kwargs
193class AddEquilibrateResource(ComponentResource):
194 @require_control
195 @actor("hplcequilibrate", enqueue=True, preprocess=True)
196 def post(self, *args, **kwargs):
197 return kwargs
199 def preprocess(self, *args, **kwargs):
200 kwargs["collect_parameters"] = add_params_to_hplc_equilibrate(kwargs["column"])
201 kwargs["collect_started_cb"] = self._parent.collect_started_cb
202 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
203 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
204 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
205 kwargs["save_metadata_to_ascii"] = self._parent.save_metadata_to_ascii
206 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
208 return kwargs
211class RunEquilibrateResource(ComponentResource):
212 @require_control
213 @actor("hplcequilibrate", enqueue=False, preprocess=True)
214 def post(self, *args, **kwargs):
215 return kwargs
217 def preprocess(self, *args, **kwargs):
218 kwargs["collect_parameters"] = add_params_to_hplc_equilibrate(kwargs["column"])
219 kwargs["collect_started_cb"] = self._parent.collect_started_cb
220 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
221 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
222 kwargs["save_metadata_to_ascii"] = self._parent.save_metadata_to_ascii
223 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
225 return kwargs
228class AddSingleCollectResource(ComponentResource):
229 @require_control
230 @actor("singlecollect", enqueue=True, preprocess=True)
231 def post(self, *args, **kwargs):
232 return kwargs
234 def preprocess(self, *args, **kwargs):
235 kwargs["collect_parameters"] = kwargs["parameters"]
236 kwargs["collect_started_cb"] = self._parent.collect_started_cb
237 kwargs["update_sample_done_cb"] = self._parent.update_sample_done_cb
238 kwargs["update_sample_scan_done_cb"] = self._parent.update_sample_scan_done_cb
239 kwargs["update_sample_failed_cb"] = self._parent.update_sample_failed_cb
240 kwargs["update_current_sample_cb"] = self._parent.update_current_sample_cb
241 kwargs["update_client_logger_cb"] = self._parent.update_client_logger_cb
243 return kwargs
246class RunSingleCollectResource(ComponentResource):
247 @require_control
248 @marshal(inp=SingleCollectArgumentSchema)
249 def post(self, *args, **kwargs):
250 single_collect.collect(
251 kwargs["parameters"],
252 callbacks={
253 "collect_started_cb": self._parent.collect_started_cb,
254 "update_sample_done_cb": self._parent.update_sample_done_cb,
255 "update_client_logger_cb": self._parent.update_client_logger_cb,
256 },
257 )
259 return kwargs
262class RunScanCollectResource(ComponentResource):
263 @require_control
264 @marshal(inp=ScanCollectArgumentSchema)
265 def post(self, *args, **kwargs):
266 if kwargs["parameters"].collect:
267 # from daiquiri.core.hardware.bliss.session import scan_collect
268 scan_collect.collect(
269 kwargs["parameters"],
270 callbacks={
271 "collect_started_cb": self._parent.collect_started_cb,
272 "update_client_logger_cb": self._parent.update_client_logger_cb,
273 },
274 )
275 else:
276 scan_collect._collect(kwargs["parameters"])
277 return kwargs
280class SamplesDoneResource(ComponentResource):
281 @require_control
282 @marshal(out=[[200, SamplesDoneSchema(), "List of sample IDs"]])
283 def get(self):
284 """
285 Get information about Finished & Failed samples, list of ids of the samples
286 that have been collected and the id of the current sample
287 """
288 return self._parent.get_samples_done(), 200
291class GetTestDataResource(ComponentResource):
292 @require_control
293 @marshal(out=[[200, DataTestArgumentSchema(), "Test samples and buffers"]])
294 def get(self):
295 """
296 Get test samples and buffers
297 """
298 return self._parent.get_test_samples(), 200
301class ProcessingParametersResource(ComponentResource):
302 @require_control
303 @marshal(out=[[200, ProcessingParametersSchema(), "processing parameters"]])
304 def get(self):
305 """
306 Get processing parameters
307 """
308 return self._parent.get_processing_parameters(), 200
310 @require_control
311 @marshal(inp={"processing_parameters": fields.Nested(ProcessingParametersSchema())})
312 def post(self, processing_parameters):
313 """Set processing_parameters <processing_parameters>"""
314 success = False
315 try:
316 success = self._parent.set_processing_parameters(processing_parameters)
317 except Exception:
318 return {"error": "Could not Set processing_parameters"}, 404
320 if success:
321 return {"Status": "OK"}, 200
322 else:
323 return {"error": "Could not Set processing_parameters"}, 404
326# Update saving proposal after login
327# THis should be move to the sessions page (to be call after login resolved)
328class SetCollectDataPathProposalResource(ComponentResource):
329 @require_control
330 @marshal(
331 inp={
332 "proposal": fields.Str(required=True, metadata={"title": "login proposal"})
333 }
334 )
335 def post(self, proposal):
336 scan_saving = current_session.scan_saving
337 try:
338 scan_saving.newproposal(proposal)
339 except Exception:
340 print("Could not change to new Proposal")
341 else:
342 print("new Proposal change properly")
343 return proposal
346# PEtra really wanted to have to possiblity to open FLint from Bsx3 GUI
347class OpenFlintResource(ComponentResource):
348 @require_control
349 def post(self):
350 try:
351 _flint = plot.get_flint(creation_allowed=False)
352 if _flint:
353 return {"Status": "OK, BLiss FLint already Opened"}, 200
354 else:
355 _flint = plot.get_flint()
356 return {"Status": "OK, Openning BLiss FLint"}, 200
357 except Exception:
358 return {"error": "Could not Open Bliss FLint"}, 404
361"""
362BIOSaxs processing
364yml configuration example:
365 name: biosaxs_collect
366 class: Biosaxscollect
367 package: mx.processing
368 uri: "tango://nela:20000/DAU/dahu/3"
369 template_file_directory: Direcotry from where template files are copied
370"""
373class Biosaxscollect(Component):
374 _actors = ["hplccollect", "sccollect", "hplcequilibrate", "scancollect"]
376 def __init__(self, *args, **kwargs):
377 super().__init__(*args, **kwargs)
378 self._samples_done = []
379 self._samples_failed = []
380 self._current_sample = {}
381 self._job_output = {}
382 self.temp_job_output = []
383 self.temp_memchache_data = []
384 self.collect_progress = 0
385 self.current_exp_name = ""
386 self.single_collect_running = True
387 self.memcache_client = Client(self._config.get("memcache_client"))
388 self.proc_template_file = self._config.get("proc_template_file")
389 self.proc_template_file_directory = self._config.get(
390 "proc_template_file_directory"
391 )
393 def setup(self):
394 self.register_actor_route(RunSingleCollectResource, "/run-single-collect")
396 self.register_actor_route(AddSCCollectResource, "/add-sc-collect")
397 self.register_actor_route(RunSCCollectResource, "/run-sc-collect")
399 self.register_actor_route(AddHPLCCollectResource, "/add-hplc-collect")
400 self.register_actor_route(RunHPLCCollectResource, "/run-hplc-collect")
402 self.register_actor_route(AddEquilibrateResource, "/add-hplc-equilibrate")
403 self.register_actor_route(RunEquilibrateResource, "/run-hplc-equilibrate")
405 self.register_actor_route(SamplesDoneResource, "/samples-done")
406 self.register_actor_route(GetTestDataResource, "/get-test-data")
408 # THis should be move to the sessions page (to be call after login resolved)
409 self.register_actor_route(
410 SetCollectDataPathProposalResource, "/data-path-proposal"
411 )
413 self.register_actor_route(
414 ProcessingParametersResource, "/processing-parameters"
415 )
417 self.register_actor_route(OpenFlintResource, "/open-flint")
419 self.register_actor_route(RunScanCollectResource, "/run-scan-collect")
421 def _emit_process_status(self):
422 print("SAMPLES DONE %s" % self._samples_done)
423 self.emit(
424 "sample_done",
425 {
426 "finished": self._samples_done,
427 "failed": self._samples_failed,
428 "current": self._current_sample,
429 "collect_progress": self.collect_progress,
430 "job_output": self._job_output,
431 "single_collect": self.single_collect_running,
432 },
433 namespace="/biosaxscollect",
434 )
436 def collect_started_cb(self, parameters):
437 self.current_exp_name = parameters.exp_name
438 self._job_output = {}
439 self._job_output["jobs"] = []
440 self._job_output["memcached"] = []
442 if parameters.experiment_type == "sc":
443 self._metadata.create_new_sc_experiment(self.current_exp_name)
445 elif parameters.experiment_type == "hplc":
446 self._metadata.create_hplc_experiment(self.current_exp_name)
448 elif parameters.experiment_type == "single_collect":
449 self.single_collect_running = True
450 self._emit_process_status()
451 print("NEW EXPERIMENT CREATED WITH NAME: %s" % self.current_exp_name)
453 def update_sample_scan_done_cb(self, sample, run_number):
454 # Save sample information to LIMS (ISPyB)
455 measurement_info = {}
456 try:
457 if isinstance(sample, SCCollectSample) or isinstance(
458 sample, SCCollectBuffer
459 ):
460 measurement_info = self._metadata.store_sc_measurement(
461 sample, run_number
462 )
463 elif isinstance(sample, HPLCCollectSample):
464 measurement_info = self._metadata.store_hplc_frames(run_number)
465 except Exception as ex:
466 print(f"Error on storing data to Ispyb {ex}")
467 return measurement_info
469 return measurement_info
471 def update_sample_done_cb(self, sample, job_output):
472 self._samples_done.append(sample.id)
473 if job_output is not None:
474 self._job_output["exp_name"] = self.current_exp_name
475 if "SCCollect" in type(sample).__name__:
476 memchache_data = {}
477 self._job_output["type"] = "sc"
478 self.temp_job_output.append(job_output)
479 if "memcached" in job_output:
480 for key, value in job_output["memcached"].items():
481 print(key, "->", value)
482 memchache_data[key] = self.get_memcache_data(value)
483 memchache_data["frame_name"] = sample.name
484 memchache_data["job_title"] = sample.file_path
485 memchache_data["job_name"] = job_output["job_name"]
486 self.temp_memchache_data.append(memchache_data)
488 if job_output["job_name"] == f"Subtract {sample.name}":
489 self._job_output["jobs"].append(self.temp_job_output)
490 self._job_output["memcached"].append(self.temp_memchache_data)
491 self.temp_job_output = []
492 self.temp_memchache_data = []
493 self.collect_progress = self.collect_progress + 5
495 elif "HPLCCollect" in type(sample).__name__:
496 self._job_output["type"] = "hplc"
497 self._job_output["jobs"] = job_output
498 self.collect_progress = self.collect_progress + 5
500 if job_output is None and self.temp_job_output is not None:
501 self._job_output["jobs"].append(self.temp_job_output)
502 self._job_output["memcached"].append(self.temp_memchache_data)
503 self.temp_job_output = []
504 self.temp_memchache_data = []
505 self.collect_progress = self.collect_progress + 5
507 self.single_collect_running = False
508 self._emit_process_status()
510 def update_client_logger_cb(self, level, typ, message):
511 if level == "info":
512 log.get("user").info(message, type=typ)
513 elif level == "exception":
514 log.get("user").exception(message, type=typ)
515 elif level == "warning":
516 log.get("user").warning(message, type=typ)
518 def get_memcache_data(self, key):
519 res = None
520 if key and self.memcache_client:
521 cash = self.memcache_client.get(key)
522 if cash:
523 res = json.loads(cash)
524 return res
526 def update_sample_failed_cb(self, sample, items=[]):
527 self.read_uv_state = False
528 self._samples_failed.append(sample.id)
529 self._emit_process_status()
531 def update_current_sample_cb(self, sample, collect_progress, items=[]):
532 self._current_sample = sample
533 self.collect_progress = collect_progress - 5
534 self._emit_process_status()
536 def actor_success(self, *args, **kwargs):
537 self._samples_done = []
538 self._samples_failed = []
539 self._current_sample = {}
540 self.collect_progress = 0
541 self._emit_process_status()
543 def actor_started(self, *args, **kwargs):
544 # THis need to be Check
545 pass
547 def get_samples_done(self):
548 # self._emit_process_status()
549 return {
550 "samples": self._samples_done,
551 "failed": self._samples_failed,
552 "current": self._current_sample,
553 "collect_progress": self.collect_progress,
554 "job_output": self._job_output,
555 }
557 def save_metadata_to_ascii(self, root_path, fname, metadata, column, parameters):
558 # import pdb; pdb.set_trace()
559 try:
560 if not os.path.exists(root_path):
561 # import pdb; pdb.set_trace()
562 sub_root_path = os.path.dirname(root_path)
563 if not os.path.exists(sub_root_path):
564 os.mkdir(sub_root_path)
565 os.mkdir(root_path)
567 data = Table()
569 for key, value in metadata.items():
570 data[key] = [value]
572 for key, value in column.items():
573 data[key] = [value]
575 for key, value in parameters.items():
576 data[key] = [value]
578 ascii.write(
579 data,
580 f"{root_path}/{fname}.dat",
581 format="fixed_width",
582 bookend=False,
583 overwrite=True,
584 )
585 return True
586 except Exception as ex:
587 print(ex.reason)
588 return False
590 def get_test_samples(self):
591 data = "{}"
592 try:
593 with open(
594 os.path.join(sys.path[0], self._config.get("data_test")), "r"
595 ) as json_file:
596 data = json.load(json_file)
597 except Exception as ex:
598 print(f"Data could not load because of {ex}")
599 return data
601 def get_processing_parameters(self):
602 _data = {}
603 log.get("user").info("Getting processing parameters", type="app")
604 if os.path.exists(self.proc_template_file):
605 try:
606 with open(self.proc_template_file) as f:
607 _data = json.load(f)
608 _data["files_root"] = self.proc_template_file_directory
609 except Exception as ex:
610 print(f"processing parameters could not load because of {ex}")
611 log.get("user").exception(
612 f"Error while Getting processing parameters : {ex})", type="app"
613 )
614 else:
615 log.get("user").error(
616 f"template file does not existe in {self.proc_template_file_directory}",
617 type="app",
618 )
620 return _data
622 def set_processing_parameters(self, processing_parameters):
623 poni_file = (
624 self.proc_template_file_directory + processing_parameters["poni_file"]
625 )
626 mask_file = (
627 self.proc_template_file_directory + processing_parameters["mask_file"]
628 )
630 if os.path.exists(poni_file) and os.path.exists(mask_file):
631 try:
632 with open(self.proc_template_file, "r") as f:
633 _data = json.load(f)
634 _data["poni_file"] = poni_file
635 _data["mask_file"] = mask_file
636 _data["fidelity_abs"] = processing_parameters["fidelity_abs"]
637 _data["fidelity_rel"] = processing_parameters["fidelity_rel"]
638 _data["normalization_factor"] = processing_parameters[
639 "normalization_factor"
640 ]
642 with open(self.proc_template_file, "w") as fi:
643 data = json.dumps(_data, indent=4, sort_keys=False)
644 fi.write(data)
645 log.get("user").info(
646 "processing parameters changed successfuly", type="app"
647 )
648 except Exception as ex:
649 print(f"processing parameters could not load because of {ex}")
650 log.get("user").exception(
651 f"Error while Changing processing parameters : {ex})", type="app"
652 )
653 return True
654 else:
655 print(
656 f"poni File or Mask file does not existe in {self.proc_template_file_directory}"
657 )
658 log.get("user").error(
659 f"poni File or Mask file does not existe in {self.proc_template_file_directory}",
660 type="app",
661 )
662 return False