Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/implementors/imageviewer/mosaic.py: 28%
65 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
5from PIL import Image, ImageOps
6from marshmallow import fields
8from daiquiri.core.components import ComponentActor, ComponentActorSchema
11class MosaicSchema(ComponentActorSchema):
12 x1 = fields.Int(required=True, metadata={"title": "X Start"})
13 y1 = fields.Int(required=True, metadata={"title": "Y Start"})
14 x2 = fields.Int(required=True, metadata={"title": "X End"})
15 y2 = fields.Int(required=True, metadata={"title": "Y End"})
16 sampleid = fields.Int(required=True, metadata={"title": "Sampleid"})
17 steps_x = fields.Int(required=True)
18 steps_y = fields.Int(required=True)
20 def time_estimate(self, data):
21 return data["steps_x"] * data["steps_y"] * 5
24class MosaicActor(ComponentActor):
25 schema = MosaicSchema
26 name = "mosaic"
27 saving_args = {"dataset": "{sampleid.name}_mosaic{sampleactionid}"}
29 def method(self, **kwargs):
30 config = self.get_config()
31 mirror_parts = config.get("mirror_parts")
32 wait_factor = config.get("wait_factor", 1.2)
33 sequential = config.get("sequential")
35 x = kwargs["absol"]["axes"].get("x")
36 if x:
37 if kwargs["steps_x"] > 1:
38 sizex = (x["destination"][1] - x["destination"][0]) / (
39 kwargs["steps_x"] - 1
40 )
41 else:
42 sizex = x["destination"][1] - x["destination"][0]
44 y = kwargs["absol"]["axes"].get("y")
45 if y:
46 if kwargs["steps_y"] > 1:
47 sizey = (y["destination"][1] - y["destination"][0]) / (
48 kwargs["steps_y"] - 1
49 )
50 else:
51 sizey = y["destination"][1] - y["destination"][0]
53 kwargs["absol"]["move_to"](kwargs["absol"])
55 im_w = None
56 im_h = None
57 full = None
58 for sy in range(kwargs["steps_y"]):
59 for sx in range(kwargs["steps_x"]):
60 if (sy + 1) % 2 == 0:
61 ssx = kwargs["steps_x"] - sx - 1
62 else:
63 ssx = sx
65 if x:
66 x["motor"].move(x["destination"][0] + (sizex * ssx))
67 if sequential:
68 x["motor"].wait()
70 if y:
71 y["motor"].move(y["destination"][0] + (sizey * sy))
72 if sequential:
73 y["motor"].wait()
75 if not sequential:
76 if x:
77 x["motor"].wait()
79 if y:
80 y["motor"].wait()
82 time.sleep(kwargs["camera"].get("exposure") * wait_factor)
84 resp = kwargs["save"](ssx, sy)
86 if sy == 0 and ssx == 0:
87 first = Image.open(resp["path"])
88 im_w = first.size[0]
89 im_h = first.size[1]
91 full = Image.new(
92 "RGB", (im_w * kwargs["steps_x"], im_h * kwargs["steps_y"])
93 )
95 part = Image.open(resp["path"])
96 full.paste(
97 im=ImageOps.mirror(part) if mirror_parts else part,
98 box=(ssx * im_w, sy * im_h),
99 )
100 self.update(full=full)