mypy....

Created Diff never expires
9 removals
313 lines
19 additions
324 lines
"""
"""
VSUtil. A collection of general-purpose VapourSynth functions to be reused in modules and scripts.
VSUtil. A collection of general-purpose VapourSynth functions to be reused in modules and scripts.
"""
"""
__all__ = ['Dither', 'Range', 'depth', 'disallow_variable_format', 'disallow_variable_resolution', 'fallback',
__all__ = ['Dither', 'Range', 'depth', 'disallow_variable_format', 'disallow_variable_resolution', 'fallback',
'frame2clip', 'get_depth', 'get_plane_size',
'frame2clip', 'get_depth', 'get_plane_size',
'get_subsampling', 'get_w', 'get_y', 'insert_clip', 'is_image', 'iterate', 'join', 'plane',
'get_subsampling', 'get_w', 'get_y', 'insert_clip', 'is_image', 'iterate', 'join', 'plane',
'split']
'split']


from enum import Enum, IntEnum
from enum import Enum, IntEnum
from functools import wraps
from functools import wraps
from mimetypes import types_map
from mimetypes import types_map
from os import path
from os import path
from typing import Any, Callable, cast, List, Literal, Optional, Tuple, Type, TypeVar, Union
from typing import Any, Callable, cast, List, Optional, Tuple, Type, TypeVar, Union


import vapoursynth as vs
import vapoursynth as vs
core = vs.core
core = vs.core


T = TypeVar('T')
T = TypeVar('T')
E = TypeVar('E', bound=Enum)
E = TypeVar('E', bound=Enum)
R = TypeVar('R')
R = TypeVar('R')
F = TypeVar('F', bound=Callable[..., Any])
F = TypeVar('F', bound=Callable[..., Any])




class _ConstantFormatVideoNode(vs.VideoNode):
format: vs.Format


class Range(IntEnum):
class Range(IntEnum):
"""
"""
enum for zimg_pixel_range_e
enum for zimg_pixel_range_e
"""
"""
LIMITED = 0 # Studio (TV) legal range, 16-235 in 8 bits.
LIMITED = 0 # Studio (TV) legal range, 16-235 in 8 bits.
FULL = 1 # Full (PC) dynamic range, 0-255 in 8 bits.
FULL = 1 # Full (PC) dynamic range, 0-255 in 8 bits.




class Dither(Enum):
class Dither(Enum):
"""
"""
enum for zimg_dither_type_e
enum for zimg_dither_type_e
"""
"""
value: str
NONE = 'none' # Round to nearest.
NONE = 'none' # Round to nearest.
ORDERED = 'ordered' # Bayer patterned dither.
ORDERED = 'ordered' # Bayer patterned dither.
RANDOM = 'random' # Pseudo-random noise of magnitude 0.5.
RANDOM = 'random' # Pseudo-random noise of magnitude 0.5.
ERROR_DIFFUSION = 'error_diffusion' # Floyd-Steinberg error diffusion.
ERROR_DIFFUSION = 'error_diffusion' # Floyd-Steinberg error diffusion.




def disallow_variable_format(function: F) -> F:
def disallow_variable_format(function: F) -> F:
"""
"""
Function decorator that raises an exception if the input clip has a variable format.
Function decorator that raises an exception if the input clip has a variable format.
Decorated function's first parameter must be of type `vapoursynth.VideoNode` and is the only parameter checked.
Decorated function's first parameter must be of type `vapoursynth.VideoNode` and is the only parameter checked.
"""
"""
@wraps(function)
@wraps(function)
def _check(clip: vs.VideoNode, *args, **kwargs) -> Any:
def _check(clip: vs.VideoNode, *args, **kwargs) -> Any:
if clip.format is None:
if clip.format is None:
raise ValueError('Variable-format clips not supported.')
raise ValueError('Variable-format clips not supported.')
return function(clip, *args, **kwargs)
return function(clip, *args, **kwargs)
return cast(F, _check)
return cast(F, _check)




def disallow_variable_resolution(function: F) -> F:
def disallow_variable_resolution(function: F) -> F:
"""
"""
Function decorator that raises an exception if the input clip has a variable resolution.
Function decorator that raises an exception if the input clip has a variable resolution.
Decorated function's first parameter must be of type `vapoursynth.VideoNode` and is the only parameter checked.
Decorated function's first parameter must be of type `vapoursynth.VideoNode` and is the only parameter checked.
"""
"""
@wraps(function)
@wraps(function)
def _check(clip: vs.VideoNode, *args, **kwargs) -> Any:
def _check(clip: vs.VideoNode, *args, **kwargs) -> Any:
if 0 in (clip.width, clip.height):
if 0 in (clip.width, clip.height):
raise ValueError('Variable-resolution clips not supported.')
raise ValueError('Variable-resolution clips not supported.')
return function(clip, *args, **kwargs)
return function(clip, *args, **kwargs)
return cast(F, _check)
return cast(F, _check)




@disallow_variable_format
@disallow_variable_format
def get_subsampling(clip: vs.VideoNode, /) -> Union[None, str]:
def get_subsampling(clip: vs.VideoNode, /) -> Union[None, str]:
"""
"""
Returns the subsampling of a VideoNode in human-readable format.
Returns the subsampling of a VideoNode in human-readable format.
Returns None for formats without subsampling.
Returns None for formats without subsampling.
"""
"""
clip = cast(_ConstantFormatVideoNode, clip)
if clip.format.color_family not in (vs.YUV, vs.YCOCG):
if clip.format.color_family not in (vs.YUV, vs.YCOCG):
return None
return None
if clip.format.subsampling_w == 1 and clip.format.subsampling_h == 1:
if clip.format.subsampling_w == 1 and clip.format.subsampling_h == 1:
return '420'
return '420'
elif clip.format.subsampling_w == 1 and clip.format.subsampling_h == 0:
elif clip.format.subsampling_w == 1 and clip.format.subsampling_h == 0:
return '422'
return '422'
elif clip.format.subsampling_w == 0 and clip.format.subsampling_h == 0:
elif clip.format.subsampling_w == 0 and clip.format.subsampling_h == 0:
return '444'
return '444'
elif clip.format.subsampling_w == 2 and clip.format.subsampling_h == 2:
elif clip.format.subsampling_w == 2 and clip.format.subsampling_h == 2:
return '410'
return '410'
elif clip.format.subsampling_w == 2 and clip.format.subsampling_h == 0:
elif clip.format.subsampling_w == 2 and clip.format.subsampling_h == 0:
return '411'
return '411'
elif clip.format.subsampling_w == 0 and clip.format.subsampling_h == 1:
elif clip.format.subsampling_w == 0 and clip.format.subsampling_h == 1:
return '440'
return '440'
else:
else:
raise ValueError('Unknown subsampling.')
raise ValueError('Unknown subsampling.')




@disallow_variable_format
@disallow_variable_format
def get_depth(clip: vs.VideoNode, /) -> int:
def get_depth(clip: vs.VideoNode, /) -> int:
"""
"""
Returns the bit depth of a VideoNode as an integer.
Returns the bit depth of a VideoNode as an integer.
"""
"""
return clip.format.bits_per_sample
return cast(_ConstantFormatVideoNode, clip).format.bits_per_sample




def get_plane_size(frame: Union[vs.VideoFrame, vs.VideoNode], /, planeno: int) -> Tuple[int, int]:
def get_plane_size(frame: Union[vs.VideoFrame, vs.VideoNode], /, planeno: int) -> Tuple[int, int]:
"""
"""
Calculates the dimensions (w, h) of the desired plane.
Calculates the dimensions (w, h) of the desired plane.


:param frame: Can be a clip or frame.
:param frame: Can be a clip or frame.
:param planeno: The desired plane's index.
:param planeno: The desired plane's index.
:return: (width, height)
:return: (width, height)
"""
"""
# Add additional checks on VideoNodes as their size and format can be variable.
# Add additional checks on VideoNodes as their size and format can be variable.
if isinstance(frame, vs.VideoNode):
if isinstance(frame, vs.VideoNode):
if frame.width == 0:
if frame.width == 0:
raise ValueError('Cannot calculate plane size of variable size clip. Pass a frame instead.')
raise ValueError('Cannot calculate plane size of variable size clip. Pass a frame instead.')
if frame.format is None:
if frame.format is None:
raise ValueError('Cannot calculate plane size of variable format clip. Pass a frame instead.')
raise ValueError('Cannot calculate plane size of variable format clip. Pass a frame instead.')
else:
frame = cast(_ConstantFormatVideoNode, frame)


width, height = frame.width, frame.height
width, height = frame.width, frame.height
if planeno != 0:
if planeno != 0:
width >>= frame.format.subsampling_w
width >>= frame.format.subsampling_w
height >>= frame.format.subsampling_h
height >>= frame.format.subsampling_h
return width, height
return width, height




def iterate(base: T, function: Callable[[Union[T, R]], R], count: int) -> Union[T, R]:
def iterate(base: T, function: Callable[[Union[T, R]], R], count: int) -> Union[T, R]:
"""
"""
Utility function that executes a given function a given number of times.
Utility function that executes a given function a given number of times.
"""
"""
if count < 0:
if count < 0:
raise ValueError('Count cannot be negative.')
raise ValueError('Count cannot be negative.')


v: Union[T, R] = base
v: Union[T, R] = base
for _ in range(count):
for _ in range(count):
v = function(v)
v = function(v)
return v
return v




def insert_clip(clip: vs.VideoNode, /, insert: vs.VideoNode, start_frame: int) -> vs.VideoNode:
def insert_clip(clip: vs.VideoNode, /, insert: vs.VideoNode, start_frame: int) -> vs.VideoNode:
"""
"""
Convenience method to insert a shorter clip into a longer one.
Convenience method to insert a shorter clip into a longer one.
The inserted clip cannot go beyond the last frame of the source clip or an exception is raised.
The inserted clip cannot go beyond the last frame of the source clip or an exception is raised.
"""
"""
if start_frame == 0:
if start_frame == 0:
return insert + clip[insert.num_frames:]
return insert + clip[insert.num_frames:]
pre = clip[:start_frame]
pre = clip[:start_frame]
frame_after_insert = start_frame + insert.num_frames
frame_after_insert = start_frame + insert.num_frames
if frame_after_insert > clip.num_frames:
if frame_after_insert > clip.num_frames:
raise ValueError('Inserted clip is too long.')
raise ValueError('Inserted clip is too long.')
if frame_after_insert == clip.num_frames:
if frame_after_insert == clip.num_frames:
return pre + insert
return pre + insert
post = clip[start_frame + insert.num_frames:]
post = clip[start_frame + insert.num_frames:]
return pre + insert + post
return pre + insert + post




def fallback(value: Optional[T], fallback_value: T) -> T:
def fallback(value: Optional[T], fallback_value: T) -> T:
"""
"""
Utility function that returns a value or a fallback if the value is None.
Utility function that returns a value or a fallback if the value is None.
"""
"""
return fallback_value if value is None else value
return fallback_value if value is None else value




@disallow_variable_format
@disallow_variable_format
def plane(clip: vs.VideoNode, planeno: int, /) -> vs.VideoNode:
def plane(clip: vs.VideoNode, planeno: int, /) -> vs.VideoNode:
"""
"""
Extract the plane with the given index from the clip.
Extract the plane with the given index from the clip.


:param clip: The clip to extract the plane from.
:param clip: The clip to extract the plane from.
:param planeno: The index that specifies which plane to extract.
:param planeno: The index that specifies which plane to extract.
:return: A grayscale clip that only contains the given plane.
:return: A grayscale clip that only contains the given plane.
"""
"""
if clip.format.num_planes == 1 and planeno == 0:
if cast(_ConstantFormatVideoNode, clip).format.num_planes == 1 and planeno == 0:
return clip
return clip
return core.std.ShufflePlanes(clip, planeno, vs.GRAY)
return core.std.ShufflePlanes([clip], [planeno], vs.GRAY)




@disallow_variable_format
def get_y(clip: vs.VideoNode, /) -> vs.VideoNode:
def get_y(clip: vs.VideoNode, /) -> vs.VideoNode:
"""
"""
Helper to get the luma of a VideoNode.
Helper to get the luma of a VideoNode.


If passed a single-plane vs.GRAY clip, it is assumed to be the luma and returned (no-op).
If passed a single-plane vs.GRAY clip, it is assumed to be the luma and returned (no-op).
"""
"""
if clip.format is None or clip.format.color_family not in (vs.YUV, vs.YCOCG, vs.GRAY):
if cast(_ConstantFormatVideoNode, clip).format.color_family not in (vs.YUV, vs.YCOCG, vs.GRAY):
raise ValueError('The clip must have a luma plane.')
raise ValueError('The clip must have a luma plane.')
return plane(clip, 0)
return plane(clip, 0)




@disallow_variable_format
@disallow_variable_format
def split(clip: vs.VideoNode, /) -> List[vs.VideoNode]:
def split(clip: vs.VideoNode, /) -> List[vs.VideoNode]:
"""
"""
Returns a list of planes for the given input clip.
Returns a list of planes for the given input clip.
"""
"""
return [plane(clip, x) for x in range(clip.format.num_planes)]
return [plane(clip, x) for x in range(cast(_ConstantFormatVideoNode, clip).format.num_planes)]




def join(planes: List[vs.VideoNode],
def join(planes: List[vs.VideoNode],
family: Literal[vs.ColorFamily.RGB, vs.ColorFamily.YUV, vs.ColorFamily.YCOCG] = vs.YUV) -> vs.VideoNode:
family: vs.ColorFamily = vs.YUV) -> vs.VideoNode:
"""
"""
Joins the supplied list of planes into a three-plane VideoNode (defaults to YUV).
Joins the supplied list of planes into a three-plane VideoNode (defaults to YUV).
"""
"""
if family not in (vs.RGB, vs.YUV, vs.YCOCG):
raise ValueError('Color family must have three planes.')
return core.std.ShufflePlanes(clips=planes, planes=[0, 0, 0], colorfamily=family)
return core.std.ShufflePlanes(clips=planes, planes=[0, 0, 0], colorfamily=family)




def frame2clip(frame: vs.VideoFrame, /, *, enforce_cache=True) -> vs.VideoNode:
def frame2clip(frame: vs.VideoFrame, /, *, enforce_cache=True) -> vs.VideoNode:
"""
"""
Converts a VapourSynth frame to a clip.
Converts a VapourSynth frame to a clip.


:param frame: The frame to wrap.
:param frame: The frame to wrap.
:param enforce_cache: Always add a cache. (Even if the vapoursynth module has this feature disabled)
:param enforce_cache: Always add a cache. (Even if the vapoursynth module has this feature disabled)
:return: A one-frame VideoNode that yields the frame passed to the function.
:return: A one-frame VideoNode that yields the frame passed to the function.
"""
"""
bc = core.std.BlankClip(
bc = core.std.BlankClip(
width=frame.width,
width=frame.width,
height=frame.height,
height=frame.height,
length=1,
length=1,
fpsnum=1,
fpsnum=1,
fpsden=1,
fpsden=1,
format=frame.format.id
format=frame.format.id
)
)
frame = frame.copy()
frame = frame.copy()
result = bc.std.ModifyFrame([bc], lambda n, f: frame.copy())
result = bc.std.ModifyFrame([bc], lambda n, f: frame.copy())


# Forcefully add a cache to Modify-Frame if caching is disabled on the core.
# Forcefully add a cache to Modify-Frame if caching is disabled on the core.
# This will ensure that the filter will not include GIL characteristics.
# This will ensure that the filter will not include GIL characteristics.
if not core.add_cache and enforce_cache:
if not core.add_cache and enforce_cache:
result = result.std.Cache(size=1, fixed=True)
result = result.std.Cache(size=1, fixed=True)
return result
return result




def get_w(height: int, aspect_ratio: float = 16 / 9, *, only_even: bool = True) -> int:
def get_w(height: int, aspect_ratio: float = 16 / 9, *, only_even: bool = True) -> int:
"""
"""
Calculates the width for a clip with the given height and aspect ratio.
Calculates the width for a clip with the given height and aspect ratio.
only_even is True by default because it imitates the math behind most standard resolutions (e.g. 854x480).
only_even is True by default because it imitates the math behind most standard resolutions (e.g. 854x480).
"""
"""
width = height * aspect_ratio
width = height * aspect_ratio
if only_even:
if only_even:
return round(width / 2) * 2
return round(width / 2) * 2
return round(width)
return round(width)




def is_image(filename: str, /) -> bool:
def is_image(filename: str, /) -> bool:
"""
"""
Returns true if a filename refers to an image.
Returns true if a filename refers to an image.
"""
"""
return types_map.get(path.splitext(filename)[-1], '').startswith('image/')
return types_map.get(path.splitext(filename)[-1], '').startswith('image/')




@disallow_variable_format
@disallow_variable_format
def depth(clip: vs.VideoNode,
def depth(clip: vs.VideoNode,
bitdepth: int,
bitdepth: int,
/,
/,
sample_type: Optional[Union[int, vs.SampleType]] = None,
sample_type: Optional[Union[int, vs.SampleType]] = None,
*,
*,
range: Optional[Union[int, Range]] = None,
range: Optional[Union[int, Range]] = None,
range_in: Optional[Union[int, Range]] = None,
range_in: Optional[Union[int, Range]] = None,
dither_type: Optional[Union[Dither, str]] = None) \
dither_type: Optional[Union[Dither, str]] = None) \
-> vs.VideoNode:
-> vs.VideoNode:
"""
"""
A bit depth converter only using core.resize and Format.replace.
A bit depth converter only using core.resize and Format.replace.
By default, outputs FLOAT sample type for 32 bit and INTEGER for anything else.
By default, outputs FLOAT sample type for 32 bit and INTEGER for anything else.


:param bitdepth: Desired bits_per_sample of output clip.
:param bitdepth: Desired bits_per_sample of output clip.
:param sample_type: Desired sample_type of output clip. Allows overriding default FLOAT/INTEGER behavior. Accepts
:param sample_type: Desired sample_type of output clip. Allows overriding default FLOAT/INTEGER behavior. Accepts
vapoursynth.SampleType enums INTEGER and FLOAT or their values, [0, 1].
vapoursynth.SampleType enums INTEGER and FLOAT or their values, [0, 1].
:param range: Output pixel range (defaults to input clip's range). See `Range`.
:param range: Output pixel range (defaults to input clip's range). See `Range`.
:param range_in: Input pixel range (defaults to input clip's range). See `Range`.
:param range_in: Input pixel range (defaults to input clip's range). See `Range`.
:param dither_type: Dithering algorithm. Allows overriding default dithering behavior. See `Dither`.
:param dither_type: Dithering algorithm. Allows overriding default dithering behavior. See `Dither`.
Defaults to Floyd-Steinberg error diffusion when downsampling, converting between ranges, or upsampling full
Defaults to Floyd-Steinberg error diffusion when downsampling, converting between ranges, or upsampling full
range input. Defaults to 'none', or round to nearest, otherwise.
range input. Defaults to 'none', or round to nearest, otherwise.


:return: Converted clip with desired bit depth and sample type. ColorFamily will be same as input.
:return: Converted clip with desired bit depth and sample type. ColorFamily will be same as input.
"""
"""
sample_type = _resolve_enum(vs.SampleType, sample_type, 'sample_type', 'vapoursynth')
sample_type = _resolve_enum(vs.SampleType, sample_type, 'sample_type', 'vapoursynth')
range = _resolve_enum(Range, range, 'range')
range = _resolve_enum(Range, range, 'range')
range_in = _resolve_enum(Range, range_in, 'range_in')
range_in = _resolve_enum(Range, range_in, 'range_in')
dither_type = _resolve_enum(Dither, dither_type, 'dither_type')
dither_type = _resolve_enum(Dither, dither_type, 'dither_type')


curr_depth = get_depth(clip)
curr_depth = get_depth(clip)
sample_type = fallback(sample_type, vs.FLOAT if bitdepth == 32 else vs.INTEGER)
sample_type = fallback(sample_type, vs.FLOAT if bitdepth == 32 else vs.INTEGER)


if (curr_depth, clip.format.sample_type, range_in) == (bitdepth, sample_type, range):
if (curr_depth, cast(_ConstantFormatVideoNode, clip).format.sample_type, range_in) == (bitdepth, sample_type, range):
return clip
return clip


# thanks @Frechdachs for explaining this:
# thanks @Frechdachs for explaining this:
# 'you need dithering when raising the bitdepth of full range [or] converting between full and limited'
# 'you need dithering when raising the bitdepth of full range [or] converting between full and limited'
should_dither = (range_in != range
should_dither = (range_in != range
or range_in == Range.FULL
or range_in == Range.FULL
or (curr_depth > bitdepth and sample_type == vs.INTEGER))
or (curr_depth > bitdepth and sample_type == vs.INTEGER))
dither_type = fallback(dither_type, Dither.ERROR_DIFFUSION if should_dither else Dither.NONE)
dither_type = fallback(dither_type, Dither.ERROR_DIFFUSION if should_dither else Dither.NONE)


return clip.resize.Point(format=clip.format.replace(bits_per_sample=bitdepth, sample_type=sample_type),
return clip.resize.Point(format=cast(_ConstantFormatVideoNode, clip).format.replace(bits_per_sample=bitdepth, sample_type=sample_type),
range=range,
range=range,
range_in=range_in,
range_in=range_in,
dither_type=dither_type.value)
dither_type=dither_type.value)




def _readable_enums(enum: Type[Enum], module: Optional[str] = None) -> str:
def _readable_enums(enum: Type[Enum], module: Optional[str] = None) -> str:
"""
"""
Returns a list of all possible values in `module.enum`.
Returns a list of all possible values in `module.enum`.
Extends the default `repr(enum.value)` behavior by prefixing the enum with the name of the module it belongs to.
Extends the default `repr(enum.value)` behavior by prefixing the enum with the name of the module it belongs to.
"""
"""
return ', '.join([f'<{fallback(module, enum.__module__)}.{str(e)}: {e.value}>' for e in enum])
return ', '.join([f'<{fallback(module, enum.__module__)}.{str(e)}: {e.value}>' for e in enum])




def _resolve_enum(enum: Type[E], value: Any, var_name: str, module: Optional[str] = None) -> Union[E, None]:
def _resolve_enum(enum: Type[E], value: Any, var_name: str, module: Optional[str] = None) -> Union[E, None]:
"""
"""
Attempts to evaluate `value` in `enum` if value is not None, otherwise returns None.
Attempts to evaluate `value` in `enum` if value is not None, otherwise returns None.
Basically checks if a supplied enum value is valid and returns a readable error message
Basically checks if a supplied enum value is valid and returns a readable error message
explaining the possible enum values if it isn't.
explaining the possible enum values if it isn't.
"""
"""
if value is None:
if value is None:
return None
return None
try:
try:
return enum(value)
return enum(value)
except ValueError:
except ValueError:
raise ValueError(f'{var_name} must be in {_readable_enums(enum, module)}.') from None
raise ValueError(f'{var_name} must be in {_readable_enums(enum, module)}.') from None