Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/saving/bliss_basic.py: 65%

43 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import pathlib 

5import time 

6import logging 

7from bliss.config.static import get_config 

8from daiquiri.core.saving.base import SavingHandler 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13def find_existing(path): 

14 path_ = pathlib.Path(path) 

15 if path_.exists(): 

16 logger.info(f"Found existing path: `{path}`") 

17 return path 

18 

19 return find_existing(path_.parent.absolute()) 

20 

21 

22class Bliss_BasicSavingHandler(SavingHandler): 

23 def __init__(self, *args, **kwargs): 

24 super().__init__(*args, **kwargs) 

25 self.session = get_config().get(self._config["saving_session"]) 

26 

27 @property 

28 def scan_saving(self): 

29 return self.session.scan_saving 

30 

31 def _set_filename(self, base_path=None, template=None, data_filename=None): 

32 self.scan_saving.base_path = base_path 

33 self.scan_saving.template = os.path.join(template, data_filename) 

34 self.scan_saving.data_filename = data_filename 

35 

36 @property 

37 def filename(self): 

38 return self.scan_saving.filename 

39 

40 def create_root_path(self, wait_exists=False, wait_timeout=360): 

41 self.scan_saving.create_root_path() 

42 

43 # If the writer is running on another machine there can be an nfs delay 

44 # syncing the newly created directory, here we can wait for this 

45 if wait_exists: 

46 waited = 0 

47 if not os.path.exists(self.scan_saving.root_path): 

48 existing_parent = find_existing(self.scan_saving.root_path) 

49 while not os.path.exists(self.scan_saving.root_path): 

50 # Touching the existing parent can cause nfs to refresh more quickly 

51 # -> caveats unknown 

52 pathlib.Path(existing_parent).touch() 

53 

54 if waited == wait_timeout: 

55 logger.warning( 

56 f"Timed out waiting for root path: {self.scan_saving.root_path} to be created" 

57 ) 

58 break 

59 

60 time.sleep(1) 

61 waited += 1 

62 

63 logger.info( 

64 f"Waited {waited}s for `{self.scan_saving.root_path}` to appear" 

65 ) 

66 

67 def create_path(self, path): 

68 self.scan_saving.create_path(path)