Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/lima.py: 88%

113 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import struct 

4import numpy 

5import typing 

6import enum 

7from dataclasses import dataclass 

8 

9from daiquiri.core.hardware.abstract import HardwareProperty 

10from daiquiri.core.hardware.abstract.camera import Camera as AbstractCamera 

11from daiquiri.core.hardware.tango.object import TangoObject 

12 

13import logging 

14 

15logger = logging.getLogger(__name__) 

16 

17try: 

18 import cv2 

19except ImportError: 

20 logger.warning("opencv not available, will not be able to bayer decode") 

21 cv2 = None 

22 

23 

24VIDEO_HEADER_FORMAT = "!IHHqiiHHHH" 

25VIDEO_MAGIC = struct.unpack(">I", b"VDEO")[0] 

26HEADER_SIZE = struct.calcsize(VIDEO_HEADER_FORMAT) 

27 

28 

29class ImageFormatNotSupported(Exception): 

30 """Raised when the RAW data from a Lima device can't be decoded as an RGB numpy array.""" 

31 

32 

33class VIDEO_MODES(enum.Enum): 

34 # From https://github.com/esrf-bliss/Lima/blob/master/common/include/lima/Constants.h#L118 

35 Y8 = 0 

36 Y16 = 1 

37 Y32 = 2 

38 Y64 = 3 

39 RGB555 = 4 

40 RGB565 = 5 

41 RGB24 = 6 

42 RGB32 = 7 

43 BGR24 = 8 

44 BGR32 = 9 

45 BAYER_RG8 = 10 

46 BAYER_RG16 = 11 

47 BAYER_BG8 = 12 

48 BAYER_BG16 = 13 

49 I420 = 14 

50 YUV411 = 15 

51 YUV422 = 16 

52 YUV444 = 17 

53 YUV411PACKED = 18 

54 YUV422PACKED = 19 

55 YUV444PACKED = 20 

56 

57 

58@dataclass 

59class VideoCodec: 

60 dtype: numpy.dtype 

61 shape: typing.Callable 

62 opencv_code: int 

63 

64 

65VIDEO_CODECS = {} 

66 

67VIDEO_CODECS[VIDEO_MODES.Y8] = VideoCodec(numpy.uint8, lambda w, h: (w, h), None) 

68VIDEO_CODECS[VIDEO_MODES.Y16] = VideoCodec(numpy.uint16, lambda w, h: (w, h), None) 

69VIDEO_CODECS[VIDEO_MODES.Y32] = VideoCodec(numpy.uint32, lambda w, h: (w, h), None) 

70VIDEO_CODECS[VIDEO_MODES.Y64] = VideoCodec(numpy.uint64, lambda w, h: (w, h), None) 

71 

72VIDEO_CODECS[VIDEO_MODES.RGB24] = VideoCodec(numpy.uint8, lambda w, h: (w, h, 3), None) 

73VIDEO_CODECS[VIDEO_MODES.RGB32] = VideoCodec(numpy.uint8, lambda w, h: (w, h, 4), None) 

74 

75if cv2: 

76 VIDEO_CODECS[VIDEO_MODES.BAYER_RG16] = VideoCodec( 

77 numpy.uint16, lambda w, h: (w, h), cv2.COLOR_BayerRG2BGR 

78 ) 

79 VIDEO_CODECS[VIDEO_MODES.BAYER_BG16] = VideoCodec( 

80 numpy.uint16, lambda w, h: (w, h), cv2.COLOR_BayerBG2BGR 

81 ) 

82 VIDEO_CODECS[VIDEO_MODES.YUV422PACKED] = VideoCodec( 

83 numpy.uint8, lambda w, h: (w, h, 2), cv2.COLOR_YUV2RGB_Y422 

84 ) 

85 VIDEO_CODECS[VIDEO_MODES.I420] = VideoCodec( 

86 numpy.uint8, lambda w, h: (w, h + h // 2), cv2.COLOR_YUV2RGB_I420 

87 ) 

88 VIDEO_CODECS[VIDEO_MODES.BAYER_BG8] = VideoCodec( 

89 numpy.uint8, lambda w, h: (w, h), cv2.COLOR_BayerRG2RGB 

90 ) 

91 VIDEO_CODECS[VIDEO_MODES.BAYER_RG8] = VideoCodec( 

92 numpy.uint8, lambda w, h: (w, h), cv2.COLOR_BayerRG2BGR 

93 ) 

94 

95 

96def parse_video_frame( 

97 raw_data: bytes, bpp: int = None, rotation: int = 0 

98) -> numpy.ndarray: 

99 """Parse a lima video frame and convert to rgb 

100 

101 Lima frame decoding in bliss 

102 https://gitlab.esrf.fr/bliss/bliss/-/blob/master/bliss/data/lima_image.py 

103 

104 Available lima pixel formats: 

105 https://gitlab.esrf.fr/limagroup/lima/-/blob/master/common/include/lima/VideoUtils.h 

106 

107 OpenCV formats: 

108 https://docs.opencv.org/master/d1/d4f/imgproc_2include_2opencv2_2imgproc_8hpp.html 

109 """ 

110 ( 

111 magic, 

112 header_version, 

113 image_mode, 

114 image_frame_number, 

115 image_width, 

116 image_height, 

117 endian, 

118 header_size, 

119 pad0, 

120 pad1, 

121 ) = struct.unpack(VIDEO_HEADER_FORMAT, raw_data[:HEADER_SIZE]) 

122 

123 if magic != VIDEO_MAGIC: 

124 raise ImageFormatNotSupported(f"Magic header not supported (found {magic:0x})") 

125 

126 if header_version != 1: 

127 raise ImageFormatNotSupported( 

128 f"Image header version not supported (found {header_version})" 

129 ) 

130 if image_frame_number < 0: 

131 raise IndexError("Image from lima video_live interface not available yet.") 

132 

133 try: 

134 mode = VIDEO_MODES(image_mode) 

135 except KeyError: 

136 raise ImageFormatNotSupported(f"Video format unsupported (found {image_mode})") 

137 

138 try: 

139 codec = VIDEO_CODECS[mode] 

140 except KeyError: 

141 raise ImageFormatNotSupported(f"Video codec unsupported (found {mode})") 

142 

143 data = numpy.frombuffer(raw_data[HEADER_SIZE:], dtype=codec.dtype) 

144 

145 # flip w and h as this is how images are saved 

146 shape = list(codec.shape(image_width, image_height)) 

147 shape[0], shape[1] = shape[1], shape[0] 

148 data.shape = shape 

149 

150 if bpp is not None: 

151 if codec.dtype == numpy.uint16: 

152 if bpp < 16: 

153 logger.debug(f"Scaling to 16bit from {bpp}") 

154 data = data << (16 - bpp) 

155 

156 if codec.dtype == numpy.uint8: 

157 if bpp > 8: 

158 logger.warning(f"Scaling to 8bit from {bpp}, trimming {16 - bpp} bits") 

159 data = data >> (bpp - 8) 

160 

161 if cv2: 

162 if codec.opencv_code is not None: 

163 logger.debug( 

164 f"Decoding frame with mode {image_mode} using {codec.opencv_code}" 

165 ) 

166 data = cv2.cvtColor(data, codec.opencv_code) 

167 

168 if rotation != 0: 

169 ninties = rotation / 90 

170 data = numpy.rot90(data, -ninties) 

171 

172 return data 

173 

174 

175class LimaStateProperty(HardwareProperty): 

176 def translate_from(self, value): 

177 vals = {"Ready": "READY", "Running": "ACQUIRING"} 

178 

179 if value in vals: 

180 return vals[value] 

181 

182 return "UNKNOWN" 

183 

184 

185class Lima(TangoObject, AbstractCamera): 

186 PROPERTY_MAP = { 

187 "state": LimaStateProperty("acq_status"), 

188 "exposure": HardwareProperty("video_exposure"), 

189 "gain": HardwareProperty("video_gain"), 

190 "mode": HardwareProperty("video_mode"), 

191 "live": HardwareProperty("video_live"), 

192 "height": HardwareProperty("image_height"), 

193 "width": HardwareProperty("image_width"), 

194 } 

195 

196 def frame(self) -> numpy.ndarray: 

197 _, raw_data = self._object.video_last_image 

198 if len(raw_data) > HEADER_SIZE: 

199 try: 

200 rotation = int(self._object.image_rotation) 

201 except ValueError: 

202 rotation = 0 

203 return parse_video_frame( 

204 raw_data, int(self._object.image_type.replace("Bpp", "")), rotation 

205 )