Source code for expert.data.video_reader

from __future__ import annotations

from collections import OrderedDict
from os import PathLike
from typing import Tuple, Optional, Union

import cv2
import numpy as np


[docs]class Cache: """Caching class for decoding videos. If the same video frame is cached and used a second time, there is no need to decode it twice. Args: capacity (int): Buffer size for storing frames. Raises: ValueError: If "capacity" is not a positive integer. """ def __init__(self, capacity: int) -> None: self._cache = OrderedDict() self._capacity = int(capacity) if capacity <= 0: raise ValueError("Capacity must be a positive integer.") @property def capacity(self): return self._capacity @property def size(self): return len(self._cache) def put(self, key: int, val: np.ndarray): if key in self._cache: return if len(self._cache) >= self.capacity: self._cache.popitem(last=False) self._cache[key] = val def get(self, key: int, default: Optional[Union[int, None]] = None): val = self._cache[key] if key in self._cache else default return val
[docs]class VideoReader: """Class for decoding video to a list object. This video wrapper class decodes the video and provides access to frames. Args: filename (str | PathLike): Path to local video file. cache_capacity (int, optional): Buffer size for storing frames. Defaults to 10. Raises: IndexError: If the entered frame index is outside the allowed range of integer values. IndexError: If the entered frame index is out of range. StopIteration: If the end of the video has been reached. """ def __init__( self, filename: Optional[Union[str, PathLike]], cache_capacity: Optional[int] = 10 ) -> None: self._vcap = cv2.VideoCapture(filename) self._cache = Cache(cache_capacity) self._position = 0 self._width = int(self._vcap.get(cv2.CAP_PROP_FRAME_WIDTH)) self._height = int(self._vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self._fps = self._vcap.get(cv2.CAP_PROP_FPS) self._frame_cnt = int(self._vcap.get(cv2.CAP_PROP_FRAME_COUNT)) self._fourcc = self._vcap.get(cv2.CAP_PROP_FOURCC) @property def vcap(self) -> cv2.VideoCapture: """Get VideoCapture object. Returns: cv2.VideoCapture: Raw VideoCapture object. """ return self._vcap @property def opened(self) -> bool: """Check whether the video is opened. Returns: bool: Indicate whether the video is opened. """ return self._vcap.isOpened() @property def width(self) -> int: """Get width of video frames. Returns: int: Width of video frames. """ return self._width @property def height(self) -> int: """Get height of video frames. Returns: int: Height of video frames. """ return self._height @property def resolution(self) -> Tuple[int, int]: """Get Video resolution (width, height). Returns: Tuple: Video resolution (width, height). """ return (self._width, self._height) @property def fps(self) -> float: """Get FPS of the video. Returns: float: FPS of the video. """ return self._fps @property def frame_cnt(self) -> int: """Get total number frames. Returns: int: Total frames of the video. """ return self._frame_cnt @property def fourcc(self) -> str: """Get four character code. Returns: str: "Four character code" of the video. """ return self._fourcc @property def position(self) -> int: """Get current cursor position. Returns: int: Current cursor position, indicating frame decoded. """ return self._position def _get_real_position(self) -> int: return int(round(self._vcap.get(cv2.CAP_PROP_POS_FRAMES))) def _set_real_position(self, frame_id: int) -> None: self._vcap.set(cv2.CAP_PROP_POS_FRAMES, frame_id) pos = self._get_real_position() for _ in range(frame_id - pos): self._vcap.read() self._position = frame_id
[docs] def read(self) -> Union[np.ndarray, None]: """Read the next frame. If the next frame have been decoded before and in the cache, then return it directly, otherwise decode, cache and return it. Returns: ndarray or None: Returns the frame if successful, otherwise returns None. """ if self._cache: image = self._cache.get(self._position) if image is not None: ret = True else: if self._position != self._get_real_position(): self._set_real_position(self._position) ret, image = self._vcap.read() if ret: self._cache.put(self._position, image) else: ret, image = self._vcap.read() if ret: self._position += 1 return image
[docs] def get_frame(self, frame_id: int) -> Union[np.ndarray, None]: """Get frame by index. Args: frame_id (int): Index of the expected frame, 0-based. Returns: ndarray or None: Returns the frame if successful, otherwise returns None. """ if frame_id < 0 or frame_id >= self._frame_cnt: raise IndexError( f'"frame_id" must be between 0 and {self._frame_cnt - 1}' ) if frame_id == self._position: return self.read() if self._cache: image = self._cache.get(frame_id) if image is not None: self._position = frame_id + 1 return image self._set_real_position(frame_id) ret, image = self._vcap.read() if ret: if self._cache: self._cache.put(self._position, image) self._position += 1 return image
[docs] def current_frame(self) -> Union[np.ndarray, None]: """Get the current frame (the frame is just visited). Returns: ndarray or None: If the video is fresh returns None, otherwise returns the frame. """ if self._position == 0: return None return self._cache.get(self._position - 1)
def __len__(self): return self.frame_cnt def __getitem__(self, index: int): if isinstance(index, slice): return [ self.get_frame(i) for i in range(*index.indices(self.frame_cnt)) ] if index < 0: index += self.frame_cnt if index < 0: raise IndexError("Index out of range.") return self.get_frame(index) def __iter__(self): self._set_real_position(0) return self def __next__(self): image = self.read() if image is not None: return image else: raise StopIteration next = __next__ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._vcap.release()