Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/lima.py: 88%
113 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import struct
4import numpy
5import typing
6import enum
7from dataclasses import dataclass
9from daiquiri.core.hardware.abstract import HardwareProperty
10from daiquiri.core.hardware.abstract.camera import Camera as AbstractCamera
11from daiquiri.core.hardware.tango.object import TangoObject
13import logging
15logger = logging.getLogger(__name__)
17try:
18 import cv2
19except ImportError:
20 logger.warning("opencv not available, will not be able to bayer decode")
21 cv2 = None
24VIDEO_HEADER_FORMAT = "!IHHqiiHHHH"
25VIDEO_MAGIC = struct.unpack(">I", b"VDEO")[0]
26HEADER_SIZE = struct.calcsize(VIDEO_HEADER_FORMAT)
29class ImageFormatNotSupported(Exception):
30 """Raised when the RAW data from a Lima device can't be decoded as an RGB numpy array."""
33class VIDEO_MODES(enum.Enum):
34 # From https://github.com/esrf-bliss/Lima/blob/master/common/include/lima/Constants.h#L118
35 Y8 = 0
36 Y16 = 1
37 Y32 = 2
38 Y64 = 3
39 RGB555 = 4
40 RGB565 = 5
41 RGB24 = 6
42 RGB32 = 7
43 BGR24 = 8
44 BGR32 = 9
45 BAYER_RG8 = 10
46 BAYER_RG16 = 11
47 BAYER_BG8 = 12
48 BAYER_BG16 = 13
49 I420 = 14
50 YUV411 = 15
51 YUV422 = 16
52 YUV444 = 17
53 YUV411PACKED = 18
54 YUV422PACKED = 19
55 YUV444PACKED = 20
58@dataclass
59class VideoCodec:
60 dtype: numpy.dtype
61 shape: typing.Callable
62 opencv_code: int
65VIDEO_CODECS = {}
67VIDEO_CODECS[VIDEO_MODES.Y8] = VideoCodec(numpy.uint8, lambda w, h: (w, h), None)
68VIDEO_CODECS[VIDEO_MODES.Y16] = VideoCodec(numpy.uint16, lambda w, h: (w, h), None)
69VIDEO_CODECS[VIDEO_MODES.Y32] = VideoCodec(numpy.uint32, lambda w, h: (w, h), None)
70VIDEO_CODECS[VIDEO_MODES.Y64] = VideoCodec(numpy.uint64, lambda w, h: (w, h), None)
72VIDEO_CODECS[VIDEO_MODES.RGB24] = VideoCodec(numpy.uint8, lambda w, h: (w, h, 3), None)
73VIDEO_CODECS[VIDEO_MODES.RGB32] = VideoCodec(numpy.uint8, lambda w, h: (w, h, 4), None)
75if cv2:
76 VIDEO_CODECS[VIDEO_MODES.BAYER_RG16] = VideoCodec(
77 numpy.uint16, lambda w, h: (w, h), cv2.COLOR_BayerRG2BGR
78 )
79 VIDEO_CODECS[VIDEO_MODES.BAYER_BG16] = VideoCodec(
80 numpy.uint16, lambda w, h: (w, h), cv2.COLOR_BayerBG2BGR
81 )
82 VIDEO_CODECS[VIDEO_MODES.YUV422PACKED] = VideoCodec(
83 numpy.uint8, lambda w, h: (w, h, 2), cv2.COLOR_YUV2RGB_Y422
84 )
85 VIDEO_CODECS[VIDEO_MODES.I420] = VideoCodec(
86 numpy.uint8, lambda w, h: (w, h + h // 2), cv2.COLOR_YUV2RGB_I420
87 )
88 VIDEO_CODECS[VIDEO_MODES.BAYER_BG8] = VideoCodec(
89 numpy.uint8, lambda w, h: (w, h), cv2.COLOR_BayerRG2RGB
90 )
91 VIDEO_CODECS[VIDEO_MODES.BAYER_RG8] = VideoCodec(
92 numpy.uint8, lambda w, h: (w, h), cv2.COLOR_BayerRG2BGR
93 )
96def parse_video_frame(
97 raw_data: bytes, bpp: int = None, rotation: int = 0
98) -> numpy.ndarray:
99 """Parse a lima video frame and convert to rgb
101 Lima frame decoding in bliss
102 https://gitlab.esrf.fr/bliss/bliss/-/blob/master/bliss/data/lima_image.py
104 Available lima pixel formats:
105 https://gitlab.esrf.fr/limagroup/lima/-/blob/master/common/include/lima/VideoUtils.h
107 OpenCV formats:
108 https://docs.opencv.org/master/d1/d4f/imgproc_2include_2opencv2_2imgproc_8hpp.html
109 """
110 (
111 magic,
112 header_version,
113 image_mode,
114 image_frame_number,
115 image_width,
116 image_height,
117 endian,
118 header_size,
119 pad0,
120 pad1,
121 ) = struct.unpack(VIDEO_HEADER_FORMAT, raw_data[:HEADER_SIZE])
123 if magic != VIDEO_MAGIC:
124 raise ImageFormatNotSupported(f"Magic header not supported (found {magic:0x})")
126 if header_version != 1:
127 raise ImageFormatNotSupported(
128 f"Image header version not supported (found {header_version})"
129 )
130 if image_frame_number < 0:
131 raise IndexError("Image from lima video_live interface not available yet.")
133 try:
134 mode = VIDEO_MODES(image_mode)
135 except KeyError:
136 raise ImageFormatNotSupported(f"Video format unsupported (found {image_mode})")
138 try:
139 codec = VIDEO_CODECS[mode]
140 except KeyError:
141 raise ImageFormatNotSupported(f"Video codec unsupported (found {mode})")
143 data = numpy.frombuffer(raw_data[HEADER_SIZE:], dtype=codec.dtype)
145 # flip w and h as this is how images are saved
146 shape = list(codec.shape(image_width, image_height))
147 shape[0], shape[1] = shape[1], shape[0]
148 data.shape = shape
150 if bpp is not None:
151 if codec.dtype == numpy.uint16:
152 if bpp < 16:
153 logger.debug(f"Scaling to 16bit from {bpp}")
154 data = data << (16 - bpp)
156 if codec.dtype == numpy.uint8:
157 if bpp > 8:
158 logger.warning(f"Scaling to 8bit from {bpp}, trimming {16 - bpp} bits")
159 data = data >> (bpp - 8)
161 if cv2:
162 if codec.opencv_code is not None:
163 logger.debug(
164 f"Decoding frame with mode {image_mode} using {codec.opencv_code}"
165 )
166 data = cv2.cvtColor(data, codec.opencv_code)
168 if rotation != 0:
169 ninties = rotation / 90
170 data = numpy.rot90(data, -ninties)
172 return data
175class LimaStateProperty(HardwareProperty):
176 def translate_from(self, value):
177 vals = {"Ready": "READY", "Running": "ACQUIRING"}
179 if value in vals:
180 return vals[value]
182 return "UNKNOWN"
185class Lima(TangoObject, AbstractCamera):
186 PROPERTY_MAP = {
187 "state": LimaStateProperty("acq_status"),
188 "exposure": HardwareProperty("video_exposure"),
189 "gain": HardwareProperty("video_gain"),
190 "mode": HardwareProperty("video_mode"),
191 "live": HardwareProperty("video_live"),
192 "height": HardwareProperty("image_height"),
193 "width": HardwareProperty("image_width"),
194 }
196 def frame(self) -> numpy.ndarray:
197 _, raw_data = self._object.video_last_image
198 if len(raw_data) > HEADER_SIZE:
199 try:
200 rotation = int(self._object.image_rotation)
201 except ValueError:
202 rotation = 0
203 return parse_video_frame(
204 raw_data, int(self._object.image_type.replace("Bpp", "")), rotation
205 )