Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/daiquiri/httpcamera.py: 0%

50 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from io import BytesIO 

4from typing import Optional 

5 

6import requests 

7from requests.auth import HTTPDigestAuth 

8from PIL import Image, UnidentifiedImageError 

9import numpy 

10 

11from daiquiri.core.hardware.abstract import HardwareProperty 

12from daiquiri.core.hardware.abstract.camera import Camera as AbstractCamera 

13from daiquiri.core.hardware.daiquiri.object import DaiquiriObject 

14 

15import logging 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20# Loader only uppercases first letter to create classname, so cant use proper camel casing here 

21# TODO: Make loader more explicit 

22class Httpcamera(DaiquiriObject, AbstractCamera): 

23 """A camera implementation that returns frames from a url 

24 

25 ```yaml 

26 - id: jpegcamera 

27 protocol: daiquiri 

28 type: httpcamera 

29 camera_url: http://mycamera/jpg/image.jpg 

30 # optionally 

31 username: user 

32 password: pass 

33 ``` 

34 

35 """ 

36 

37 PROPERTY_MAP = { 

38 "state": HardwareProperty("acq_status"), 

39 "live": HardwareProperty("video_live"), 

40 "height": HardwareProperty("image_height"), 

41 "width": HardwareProperty("image_width"), 

42 "exposure": HardwareProperty("video_exposure"), 

43 "gain": HardwareProperty("video_gain"), 

44 "mode": HardwareProperty("video_mode"), 

45 } 

46 

47 acq_status = "RUNNING" 

48 video_live = True 

49 video_exposure = 1 

50 video_gain = 1 

51 video_mode = "RGB24" 

52 

53 def __init__(self, *args, **kwargs): 

54 super().__init__(*args, **kwargs) 

55 

56 self._auth = None 

57 if "username" in self._config and "password" in self._config: 

58 self._auth = HTTPDigestAuth( 

59 self._config["username"], self._config["password"] 

60 ) 

61 

62 @property 

63 def image_width(self) -> Optional[int]: 

64 image = self._get_frame() 

65 if image: 

66 return image.size[0] 

67 

68 @property 

69 def image_height(self) -> Optional[int]: 

70 image = self._get_frame() 

71 if image: 

72 return image.size[1] 

73 

74 def _get_frame(self) -> Optional[Image.Image]: 

75 response = requests.get(self._config["camera_url"], auth=self._auth, timeout=10) 

76 if response.status_code == 200: 

77 try: 

78 image = Image.open(BytesIO(response.content)) 

79 if image.mode != "RGB": 

80 image = image.convert("RGB") 

81 

82 if self._config.get("crop"): 

83 image = image.crop(self._config.get("crop")) 

84 

85 return image 

86 except UnidentifiedImageError: 

87 logger.error("Could not parse image with PIL") 

88 else: 

89 logger.warning( 

90 f"{self._config['camera_url']} returned {response.status_code}" 

91 ) 

92 

93 def frame(self) -> Optional[numpy.ndarray]: 

94 image = self._get_frame() 

95 if image: 

96 return numpy.array(image)