Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/implementors/imageviewer/mosaic.py: 28%

65 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4 

5from PIL import Image, ImageOps 

6from marshmallow import fields 

7 

8from daiquiri.core.components import ComponentActor, ComponentActorSchema 

9 

10 

11class MosaicSchema(ComponentActorSchema): 

12 x1 = fields.Int(required=True, metadata={"title": "X Start"}) 

13 y1 = fields.Int(required=True, metadata={"title": "Y Start"}) 

14 x2 = fields.Int(required=True, metadata={"title": "X End"}) 

15 y2 = fields.Int(required=True, metadata={"title": "Y End"}) 

16 sampleid = fields.Int(required=True, metadata={"title": "Sampleid"}) 

17 steps_x = fields.Int(required=True) 

18 steps_y = fields.Int(required=True) 

19 

20 def time_estimate(self, data): 

21 return data["steps_x"] * data["steps_y"] * 5 

22 

23 

24class MosaicActor(ComponentActor): 

25 schema = MosaicSchema 

26 name = "mosaic" 

27 saving_args = {"dataset": "{sampleid.name}_mosaic{sampleactionid}"} 

28 

29 def method(self, **kwargs): 

30 config = self.get_config() 

31 mirror_parts = config.get("mirror_parts") 

32 wait_factor = config.get("wait_factor", 1.2) 

33 sequential = config.get("sequential") 

34 

35 x = kwargs["absol"]["axes"].get("x") 

36 if x: 

37 if kwargs["steps_x"] > 1: 

38 sizex = (x["destination"][1] - x["destination"][0]) / ( 

39 kwargs["steps_x"] - 1 

40 ) 

41 else: 

42 sizex = x["destination"][1] - x["destination"][0] 

43 

44 y = kwargs["absol"]["axes"].get("y") 

45 if y: 

46 if kwargs["steps_y"] > 1: 

47 sizey = (y["destination"][1] - y["destination"][0]) / ( 

48 kwargs["steps_y"] - 1 

49 ) 

50 else: 

51 sizey = y["destination"][1] - y["destination"][0] 

52 

53 kwargs["absol"]["move_to"](kwargs["absol"]) 

54 

55 im_w = None 

56 im_h = None 

57 full = None 

58 for sy in range(kwargs["steps_y"]): 

59 for sx in range(kwargs["steps_x"]): 

60 if (sy + 1) % 2 == 0: 

61 ssx = kwargs["steps_x"] - sx - 1 

62 else: 

63 ssx = sx 

64 

65 if x: 

66 x["motor"].move(x["destination"][0] + (sizex * ssx)) 

67 if sequential: 

68 x["motor"].wait() 

69 

70 if y: 

71 y["motor"].move(y["destination"][0] + (sizey * sy)) 

72 if sequential: 

73 y["motor"].wait() 

74 

75 if not sequential: 

76 if x: 

77 x["motor"].wait() 

78 

79 if y: 

80 y["motor"].wait() 

81 

82 time.sleep(kwargs["camera"].get("exposure") * wait_factor) 

83 

84 resp = kwargs["save"](ssx, sy) 

85 

86 if sy == 0 and ssx == 0: 

87 first = Image.open(resp["path"]) 

88 im_w = first.size[0] 

89 im_h = first.size[1] 

90 

91 full = Image.new( 

92 "RGB", (im_w * kwargs["steps_x"], im_h * kwargs["steps_y"]) 

93 ) 

94 

95 part = Image.open(resp["path"]) 

96 full.paste( 

97 im=ImageOps.mirror(part) if mirror_parts else part, 

98 box=(ssx * im_w, sy * im_h), 

99 ) 

100 self.update(full=full)