Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/metadata/ispyb.py: 0%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1# -*- coding: utf-8 -*- 

2import re 

3import logging 

4import json 

5 

6from flask import g 

7 

8from requests import Session, post, get 

9from requests.auth import HTTPBasicAuth 

10from zeep import Client 

11from zeep.transports import Transport 

12 

13import xmltodict 

14from urllib.parse import urljoin 

15from bliss import current_session 

16 

17from marshmallow import fields, Schema, EXCLUDE 

18from daiquiri.core import CoreResource, marshal 

19from daiquiri.core.metadata.user import User 

20from daiquiri.core.metadata import MetaDataHandler 

21 

22# from daiquiri.core.schema.biosaxscollect.biosaxs import ( 

23# SCCollectSample,37 

24# SCCollectBuffer, 

25# SCIspybMetadataArgumentSchema 

26# ) 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class ISPyBConfigSchema(Schema): 

32 meta_url = fields.Str() 

33 meta_user = fields.Str() 

34 meta_password = fields.Str() 

35 meta_beamline = fields.Str() 

36 meta_staff = fields.Str() 

37 rest_meta_user = fields.Str() 

38 rest_meta_password = fields.Str() 

39 

40 

41class ProposalResource(CoreResource): 

42 @marshal(inp={"code": fields.Str(), "number": fields.Str()}) 

43 def get(self, code, number): 

44 """Get current proposal""" 

45 proposal = self._parent.find_proposal(code, number) 

46 

47 if proposal: 

48 return proposal, 200 

49 else: 

50 return {"error": "No such propsoal"}, 404 

51 

52 

53class ListExperimentsResource(CoreResource): 

54 def get(self): 

55 """Get a list of all experiments for current proposal""" 

56 experiments = json.loads(self._parent.list_experiments()) 

57 

58 if experiments: 

59 return experiments, 200 

60 else: 

61 return {"error": "Could not find any experiment for proposal"}, 404 

62 

63 

64class GetExperimentResource(CoreResource): 

65 @marshal(inp={"experiment_id": fields.Str()}) 

66 def get(self, experiment_id): 

67 """Get experiment with experiment id: <experiment_id>""" 

68 experiment = json.loads(self._parent.get_experiment(experiment_id)) 

69 

70 if experiment: 

71 return experiment, 200 

72 else: 

73 return {"error": "Could not find any experiment for proposal"}, 404 

74 

75 

76# ----------- rest web socket ------------------ 

77# store list of experiment from UI to ISPYB 

78class StoreSCTableRestWSResource(CoreResource): 

79 @marshal( 

80 inp={ 

81 # "data": fields.Nested(SCIspybMetadataArgumentSchema)}) 

82 "data": fields.Raw(required=True) 

83 } 

84 ) 

85 def post(self, data): 

86 """Store experiment with data <data>""" 

87 success = False 

88 try: 

89 self._parent.create_rest_ws_templete_experiment(data) 

90 success = True 

91 except Exception: 

92 return {"error": "Could not store experiment"}, 404 

93 

94 if success: 

95 return {"Succeed": "Successfully "}, 200 

96 

97 return {"error": "Could not store experiment"}, 404 

98 

99 

100# Get List of Experiment and send them to UI 

101class GetSCExperimentRestWSResource(CoreResource): 

102 def get(self): 

103 """Get experiment with experiment id""" 

104 experiment = self._parent.get_rest_ws_experiments() 

105 if experiment: 

106 return {"data": experiment}, 200 

107 else: 

108 return {"error": "Could not find any experiment for proposal"}, 404 

109 

110 

111# ------------------------ END rest web socket --------------------------------- 

112 

113 

114class IspybMetaDataHandler(MetaDataHandler): 

115 def __init__(self, *args, **kwargs): 

116 self._config = ISPyBConfigSchema().load( 

117 kwargs.get("config", {}), unknown=EXCLUDE 

118 ) 

119 super().__init__(*args, **kwargs) 

120 

121 self._current_experiment = None 

122 self._current_measurement = None 

123 self._current_proposal = None 

124 

125 def _get_prop(self): 

126 prop = re.findall(r"[^\W\d_]+|\d+", self._current_proposal) 

127 

128 try: 

129 pcode, pnumber = prop 

130 except ValueError: 

131 pcode, pnumber = ("", "") 

132 

133 return pcode, pnumber 

134 

135 def setup(self, *args, **kwargs): 

136 logger.debug("Loaded: {c}".format(c=self.__class__.__name__)) 

137 

138 self.register_route(GetExperimentResource, "/get-experiment") 

139 

140 # Rest Web Services 

141 self.register_route(GetSCExperimentRestWSResource, "/list-experiments") 

142 self.register_route(StoreSCTableRestWSResource, "/store_sc_experiment") 

143 

144 self.rest_ws_host = self._config.get("rest_meta_host") 

145 

146 host = self._config.get("meta_host") 

147 

148 session = Session() 

149 session.auth = HTTPBasicAuth( 

150 self._config.get("meta_user"), self._config.get("meta_password") 

151 ) 

152 

153 self._shipping_client = Client( 

154 host + "ToolsForShippingWebService?wsdl", 

155 transport=Transport(session=session), 

156 ) 

157 

158 self._collection_client = Client( 

159 host + "ToolsForCollectionWebService?wsdl", 

160 transport=Transport(session=session), 

161 ) 

162 

163 self._biosaxs_client = Client( 

164 host + "ToolsForBiosaxsWebService?wsdl", 

165 transport=Transport(session=session), 

166 ) 

167 

168 self._generic_biosaxs_client = Client( 

169 host + "GenericSampleChangerBiosaxsWebService?wsdl", 

170 transport=Transport(session=session), 

171 ) 

172 

173 super().setup() 

174 

175 def find_sessions(self, code, number): 

176 return self._collection_client.service.findSessionsByProposalAndBeamLine( 

177 code + number, self._config.get("beamline") 

178 ) 

179 

180 def find_proposal(self, code, number): 

181 return self._shipping_client.service.findProposal(code, number) 

182 

183 def get_user(self, **kwargs): 

184 self._current_proposal = g.login 

185 

186 u = { 

187 "givenname": "", 

188 "familyname": "", 

189 "fullname": "", 

190 "personid": 1, 

191 "login": self._current_proposal, 

192 "groups": [], 

193 "permissions": ["staff"], 

194 } 

195 

196 user = User(**u) 

197 user["is_staff"] = user.permission(self._config["meta_staff"]) 

198 

199 return user 

200 

201 def set_session(self, session): 

202 if self.verify_session(session): 

203 self._session.update({"blsession": session}) 

204 return True 

205 

206 def verify_session(self, session): 

207 return self.get_sessions(session=session) 

208 

209 def get_proposals(self, proposal=None, **kwargs): 

210 code, number = re.findall(r"[^\W\d_]+|\d+", proposal) 

211 p = self.find_proposal(code, number) 

212 proposal = {} 

213 

214 if p: 

215 proposal = { 

216 "proposalid": p.proposalId, 

217 "proposalcode": p.code, 

218 "proposalnumber": p.number, 

219 "proposal": p.code + p.number, 

220 } 

221 

222 self.current_proposal = code, number 

223 

224 return proposal 

225 

226 def get_sessions(self, session=None, **kwargs): 

227 session = { 

228 "sessionid": 1, 

229 "proposalid": 1, 

230 "proposal": g.login, 

231 "visit_number": 1, 

232 "session": "s1", 

233 } 

234 

235 return session 

236 

237 # --------------------------------------------------------------------------------------------------- 

238 def get_rest_ws_token(self): 

239 auth_url = urljoin(self.rest_ws_host, "authenticate?site=" + "ESRF") 

240 # self._config.get("meta_user"), self._config.get("meta_password") 

241 try: 

242 data = { 

243 "login": self._config.get("rest_meta_user"), 

244 "password": self._config.get("rest_meta_password"), 

245 } 

246 response = post(auth_url, data=data, timeout=900) 

247 rest_token = response.json().get("token") 

248 except Exception as ex: 

249 msg = "POST to %s failed reason %s" % (auth_url, str(ex)) 

250 logging.getLogger("ispyb_client").exception(msg) 

251 return "" 

252 

253 return rest_token 

254 

255 def get_rest_ws_sample_info(self): 

256 """Get all sample information""" 

257 rest_token = self.get_rest_ws_token() 

258 current_user = current_session.scan_saving.proposal.name 

259 sample_info = get( 

260 self.rest_ws_host + rest_token + "/proposal/" + current_user + "/info/get", 

261 timeout=900, 

262 ) 

263 return sample_info.json() 

264 

265 def get_rest_ws_list_experiment(self): 

266 rest_token = self.get_rest_ws_token() 

267 current_user = current_session.scan_saving.proposal.name 

268 experiment_list = get( 

269 self.rest_ws_host 

270 + rest_token 

271 + "/proposal/" 

272 + current_user 

273 + "/saxs/experiment/list", 

274 timeout=900, 

275 ) 

276 

277 return experiment_list.json() 

278 

279 def get_rest_ws_experiments(self): 

280 """Get all TEMPLATE experiments by experiment id""" 

281 experiments = [] 

282 rest_token = self.get_rest_ws_token() 

283 experiment_list = self.get_rest_ws_list_experiment() 

284 current_user = current_session.scan_saving.proposal.name 

285 for experiment in experiment_list: 

286 experimentid = experiment["experimentId"] 

287 if experiment["experimentType"] == "TEMPLATE": 

288 experimentbyid = get( 

289 self.rest_ws_host 

290 + rest_token 

291 + "/proposal/" 

292 + current_user 

293 + "/saxs/experiment/" 

294 + str(experimentid) 

295 + "/samplechanger" 

296 + "/type/Sample" 

297 "/template", 

298 timeout=900, 

299 ) 

300 experimentbyid = experimentbyid.text.replace("\n", "") 

301 if experimentbyid != "": 

302 data = json.loads(json.dumps(xmltodict.parse(experimentbyid))) 

303 data["bsxcube"].update( 

304 experiment 

305 ) # this is not elegant, but necessary 

306 experiments.append(data) 

307 print(current_user) 

308 print(len(experiments)) 

309 return experiments 

310 

311 def create_rest_ws_templete_experiment(self, data): 

312 rest_token = self.get_rest_ws_token() 

313 current_user = current_session.scan_saving.proposal.name 

314 try: 

315 url = ( 

316 self.rest_ws_host 

317 + rest_token 

318 + "/proposal/" 

319 + current_user 

320 + "/saxs/experiment/save" 

321 ) 

322 data = { 

323 "name": data["name"], 

324 "comments": data["comments"], 

325 "measurements": str(data["measurements"]), 

326 } 

327 response = post(url, data, timeout=900) 

328 print(current_user) 

329 print(response) 

330 return response 

331 except Exception as ex: 

332 msg = "POST to %s failed reason %s" % (url, str(ex)) 

333 logging.getLogger("ispyb_client").exception(msg) 

334 

335 # ------------------------------------------------------------------------------------------------------------------ 

336 

337 def create_new_sc_experiment(self, exp_name): 

338 """On Collect started Created a new empty Experiment""" 

339 pcode, pnumber = self._get_prop() 

340 experiment_name = exp_name 

341 experiment = self._generic_biosaxs_client.service.createEmptyExperiment( 

342 pcode, pnumber, experiment_name 

343 ) 

344 self._current_experiment = json.loads(experiment) 

345 

346 def store_sc_measurement(self, item, run_number): 

347 experiment_id = self._current_experiment.get("experimentId", None) 

348 _type = "SAMPLE" if "Sample" in type(item).__name__ else "BUFFER" 

349 name = item.name 

350 buffer_name = item.buffer_name if "Sample" in type(item).__name__ else item.name 

351 measurement = ( 

352 self._generic_biosaxs_client.service.appendMeasurementToExperiment( 

353 experiment_id, 

354 run_number, 

355 _type, 

356 item.plate, 

357 ord(item.row) - 64, 

358 item.column, 

359 name, 

360 buffer_name, 

361 item.concentration, 

362 item.seu_temperature, 

363 item.viscosity, 

364 item.volume, 

365 item.volume, 

366 item.wait, 

367 item.transmission, 

368 item.comment, 

369 ) 

370 ) 

371 self._current_measurement = measurement 

372 measurementId = measurement.measurementId 

373 measurement_info = { 

374 "measurement_id": measurementId, 

375 "experiment_id": experiment_id, 

376 "run_number": run_number, 

377 } 

378 

379 self._generic_biosaxs_client.service.addRun( 

380 experiment_id, 

381 run_number, 

382 item.seu_temperature, 

383 item.storage_temperature, 

384 item.exposure_time, 

385 "None", 

386 "None", 

387 item.energy, 

388 "detectorDistance", 

389 "snapshotCapillary", 

390 "current", 

391 "bmX", 

392 "bmY", 

393 "rR", 

394 "rA", 

395 "pxX", 

396 "pxY", 

397 "normalization", 

398 item.transmission, 

399 ) 

400 

401 return measurement_info 

402 

403 def create_hplc_experiment(self, experiment_name): 

404 pcode, pnumber = self._get_prop() 

405 

406 self._current_experiment = self._biosaxs_client.service.createHPLC( 

407 pcode, pnumber, experiment_name 

408 ) 

409 

410 def store_hplc_frames(self, run_number): 

411 experiment_id = self._current_experiment 

412 measurement = self._biosaxs_client.service.storeHPLC( 

413 experiment_id, 

414 "", 

415 None, 

416 ) 

417 measurement_info = { 

418 "measurement_id": measurement, 

419 "experiment_id": experiment_id, 

420 "run_number": run_number, 

421 } 

422 self._current_measurement = measurement 

423 return measurement_info