Shortcuts

Note

You are reading the documentation for MMEditing 0.x, which will soon be deprecated by the end of 2022. We recommend you upgrade to MMEditing 1.0 to enjoy fruitful new features and better performance brought by OpenMMLab 2.0. Check out the changelog, code and documentation of MMEditing 1.0 for more details.

Source code for mmedit.datasets.pipelines.random_degradations

# Copyright (c) OpenMMLab. All rights reserved.
import io
import logging
import random

import cv2
import numpy as np

from mmedit.datasets.pipelines import blur_kernels as blur_kernels
from ..registry import PIPELINES

try:
    import av
    has_av = True
except ImportError:
    has_av = False


[docs]@PIPELINES.register_module() class RandomBlur: """Apply random blur to the input. Modified keys are the attributed specified in "keys". Args: params (dict): A dictionary specifying the degradation settings. keys (list[str]): A list specifying the keys whose values are modified. """ def __init__(self, params, keys): self.keys = keys self.params = params def get_kernel(self, num_kernels): kernel_type = np.random.choice( self.params['kernel_list'], p=self.params['kernel_prob']) kernel_size = random.choice(self.params['kernel_size']) sigma_x_range = self.params.get('sigma_x', [0, 0]) sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) sigma_x_step = self.params.get('sigma_x_step', 0) sigma_y_range = self.params.get('sigma_y', [0, 0]) sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) sigma_y_step = self.params.get('sigma_y_step', 0) rotate_angle_range = self.params.get('rotate_angle', [-np.pi, np.pi]) rotate_angle = np.random.uniform(rotate_angle_range[0], rotate_angle_range[1]) rotate_angle_step = self.params.get('rotate_angle_step', 0) beta_gau_range = self.params.get('beta_gaussian', [0.5, 4]) beta_gau = np.random.uniform(beta_gau_range[0], beta_gau_range[1]) beta_gau_step = self.params.get('beta_gaussian_step', 0) beta_pla_range = self.params.get('beta_plateau', [1, 2]) beta_pla = np.random.uniform(beta_pla_range[0], beta_pla_range[1]) beta_pla_step = self.params.get('beta_plateau_step', 0) omega_range = self.params.get('omega', None) omega_step = self.params.get('omega_step', 0) if omega_range is None: # follow Real-ESRGAN settings if not specified if kernel_size < 13: omega_range = [np.pi / 3., np.pi] else: omega_range = [np.pi / 5., np.pi] omega = np.random.uniform(omega_range[0], omega_range[1]) # determine blurring kernel kernels = [] for _ in range(0, num_kernels): kernel = blur_kernels.random_mixed_kernels( [kernel_type], [1], kernel_size, [sigma_x, sigma_x], [sigma_y, sigma_y], [rotate_angle, rotate_angle], [beta_gau, beta_gau], [beta_pla, beta_pla], [omega, omega], None, ) kernels.append(kernel) # update kernel parameters sigma_x += np.random.uniform(-sigma_x_step, sigma_x_step) sigma_y += np.random.uniform(-sigma_y_step, sigma_y_step) rotate_angle += np.random.uniform(-rotate_angle_step, rotate_angle_step) beta_gau += np.random.uniform(-beta_gau_step, beta_gau_step) beta_pla += np.random.uniform(-beta_pla_step, beta_pla_step) omega += np.random.uniform(-omega_step, omega_step) sigma_x = np.clip(sigma_x, sigma_x_range[0], sigma_x_range[1]) sigma_y = np.clip(sigma_y, sigma_y_range[0], sigma_y_range[1]) rotate_angle = np.clip(rotate_angle, rotate_angle_range[0], rotate_angle_range[1]) beta_gau = np.clip(beta_gau, beta_gau_range[0], beta_gau_range[1]) beta_pla = np.clip(beta_pla, beta_pla_range[0], beta_pla_range[1]) omega = np.clip(omega, omega_range[0], omega_range[1]) return kernels def _apply_random_blur(self, imgs): is_single_image = False if isinstance(imgs, np.ndarray): is_single_image = True imgs = [imgs] # get kernel and blur the input kernels = self.get_kernel(num_kernels=len(imgs)) imgs = [ cv2.filter2D(img, -1, kernel) for img, kernel in zip(imgs, kernels) ] if is_single_image: imgs = imgs[0] return imgs def __call__(self, results): if np.random.uniform() > self.params.get('prob', 1): return results for key in self.keys: results[key] = self._apply_random_blur(results[key]) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(params={self.params}, keys={self.keys})') return repr_str
[docs]@PIPELINES.register_module() class RandomResize: """Randomly resize the input. Modified keys are the attributed specified in "keys". Args: params (dict): A dictionary specifying the degradation settings. keys (list[str]): A list specifying the keys whose values are modified. """ def __init__(self, params, keys): self.keys = keys self.params = params self.resize_dict = dict( bilinear=cv2.INTER_LINEAR, bicubic=cv2.INTER_CUBIC, area=cv2.INTER_AREA, lanczos=cv2.INTER_LANCZOS4) def _random_resize(self, imgs): is_single_image = False if isinstance(imgs, np.ndarray): is_single_image = True imgs = [imgs] h, w = imgs[0].shape[:2] resize_opt = self.params['resize_opt'] resize_prob = self.params['resize_prob'] resize_opt = np.random.choice(resize_opt, p=resize_prob).lower() if resize_opt not in self.resize_dict: raise NotImplementedError(f'resize_opt [{resize_opt}] is not ' 'implemented') resize_opt = self.resize_dict[resize_opt] resize_step = self.params.get('resize_step', 0) # determine the target size, if not provided target_size = self.params.get('target_size', None) if target_size is None: resize_mode = np.random.choice(['up', 'down', 'keep'], p=self.params['resize_mode_prob']) resize_scale = self.params['resize_scale'] if resize_mode == 'up': scale_factor = np.random.uniform(1, resize_scale[1]) elif resize_mode == 'down': scale_factor = np.random.uniform(resize_scale[0], 1) else: scale_factor = 1 # determine output size h_out, w_out = h * scale_factor, w * scale_factor if self.params.get('is_size_even', False): h_out, w_out = 2 * (h_out // 2), 2 * (w_out // 2) target_size = (int(h_out), int(w_out)) else: resize_step = 0 # resize the input if resize_step == 0: # same target_size for all input images outputs = [ cv2.resize(img, target_size[::-1], interpolation=resize_opt) for img in imgs ] else: # different target_size for each input image outputs = [] for img in imgs: img = cv2.resize( img, target_size[::-1], interpolation=resize_opt) outputs.append(img) # update scale scale_factor += np.random.uniform(-resize_step, resize_step) scale_factor = np.clip(scale_factor, resize_scale[0], resize_scale[1]) # determine output size h_out, w_out = h * scale_factor, w * scale_factor if self.params.get('is_size_even', False): h_out, w_out = 2 * (h_out // 2), 2 * (w_out // 2) target_size = (int(h_out), int(w_out)) if is_single_image: outputs = outputs[0] return outputs def __call__(self, results): if np.random.uniform() > self.params.get('prob', 1): return results for key in self.keys: results[key] = self._random_resize(results[key]) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(params={self.params}, keys={self.keys})') return repr_str
[docs]@PIPELINES.register_module() class RandomNoise: """Apply random noise to the input. Currently support Gaussian noise and Poisson noise. Modified keys are the attributed specified in "keys". Args: params (dict): A dictionary specifying the degradation settings. keys (list[str]): A list specifying the keys whose values are modified. """ def __init__(self, params, keys): self.keys = keys self.params = params def _apply_gaussian_noise(self, imgs): sigma_range = self.params['gaussian_sigma'] sigma = np.random.uniform(sigma_range[0], sigma_range[1]) / 255. sigma_step = self.params.get('gaussian_sigma_step', 0) gray_noise_prob = self.params['gaussian_gray_noise_prob'] is_gray_noise = np.random.uniform() < gray_noise_prob outputs = [] for img in imgs: noise = np.float32(np.random.randn(*(img.shape))) * sigma if is_gray_noise: noise = noise[:, :, :1] outputs.append(img + noise) # update noise level sigma += np.random.uniform(-sigma_step, sigma_step) / 255. sigma = np.clip(sigma, sigma_range[0] / 255., sigma_range[1] / 255.) return outputs def _apply_poisson_noise(self, imgs): scale_range = self.params['poisson_scale'] scale = np.random.uniform(scale_range[0], scale_range[1]) scale_step = self.params.get('poisson_scale_step', 0) gray_noise_prob = self.params['poisson_gray_noise_prob'] is_gray_noise = np.random.uniform() < gray_noise_prob outputs = [] for img in imgs: noise = img.copy() if is_gray_noise: noise = cv2.cvtColor(noise[..., [2, 1, 0]], cv2.COLOR_BGR2GRAY) noise = noise[..., np.newaxis] noise = np.clip((noise * 255.0).round(), 0, 255) / 255. unique_val = 2**np.ceil(np.log2(len(np.unique(noise)))) noise = np.random.poisson(noise * unique_val) / unique_val - noise outputs.append(img + noise * scale) # update noise level scale += np.random.uniform(-scale_step, scale_step) scale = np.clip(scale, scale_range[0], scale_range[1]) return outputs def _apply_random_noise(self, imgs): noise_type = np.random.choice( self.params['noise_type'], p=self.params['noise_prob']) is_single_image = False if isinstance(imgs, np.ndarray): is_single_image = True imgs = [imgs] if noise_type.lower() == 'gaussian': imgs = self._apply_gaussian_noise(imgs) elif noise_type.lower() == 'poisson': imgs = self._apply_poisson_noise(imgs) else: raise NotImplementedError(f'"noise_type" [{noise_type}] is ' 'not implemented.') if is_single_image: imgs = imgs[0] return imgs def __call__(self, results): if np.random.uniform() > self.params.get('prob', 1): return results for key in self.keys: results[key] = self._apply_random_noise(results[key]) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(params={self.params}, keys={self.keys})') return repr_str
[docs]@PIPELINES.register_module() class RandomJPEGCompression: """Apply random JPEG compression to the input. Modified keys are the attributed specified in "keys". Args: params (dict): A dictionary specifying the degradation settings. keys (list[str]): A list specifying the keys whose values are modified. """ def __init__(self, params, keys): self.keys = keys self.params = params def _apply_random_compression(self, imgs): is_single_image = False if isinstance(imgs, np.ndarray): is_single_image = True imgs = [imgs] # determine initial compression level and the step size quality = self.params['quality'] quality_step = self.params.get('quality_step', 0) jpeg_param = round(np.random.uniform(quality[0], quality[1])) # apply jpeg compression outputs = [] for img in imgs: encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_param] _, img_encoded = cv2.imencode('.jpg', img * 255., encode_param) outputs.append(np.float32(cv2.imdecode(img_encoded, 1)) / 255.) # update compression level jpeg_param += np.random.uniform(-quality_step, quality_step) jpeg_param = round(np.clip(jpeg_param, quality[0], quality[1])) if is_single_image: outputs = outputs[0] return outputs def __call__(self, results): if np.random.uniform() > self.params.get('prob', 1): return results for key in self.keys: results[key] = self._apply_random_compression(results[key]) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(params={self.params}, keys={self.keys})') return repr_str
[docs]@PIPELINES.register_module() class RandomVideoCompression: """Apply random video compression to the input. Modified keys are the attributed specified in "keys". Args: params (dict): A dictionary specifying the degradation settings. keys (list[str]): A list specifying the keys whose values are modified. """ def __init__(self, params, keys): assert has_av, 'Please install av to use video compression.' self.keys = keys self.params = params logging.getLogger('libav').setLevel(50) def _apply_random_compression(self, imgs): codec = random.choices(self.params['codec'], self.params['codec_prob'])[0] bitrate = self.params['bitrate'] bitrate = np.random.randint(bitrate[0], bitrate[1] + 1) buf = io.BytesIO() with av.open(buf, 'w', 'mp4') as container: stream = container.add_stream(codec, rate=1) stream.height = imgs[0].shape[0] stream.width = imgs[0].shape[1] stream.pix_fmt = 'yuv420p' stream.bit_rate = bitrate for img in imgs: img = (255 * img).astype(np.uint8) frame = av.VideoFrame.from_ndarray(img, format='rgb24') frame.pict_type = 'NONE' for packet in stream.encode(frame): container.mux(packet) # Flush stream for packet in stream.encode(): container.mux(packet) outputs = [] with av.open(buf, 'r', 'mp4') as container: if container.streams.video: for frame in container.decode(**{'video': 0}): outputs.append( frame.to_rgb().to_ndarray().astype(np.float32) / 255.) return outputs def __call__(self, results): if np.random.uniform() > self.params.get('prob', 1): return results for key in self.keys: results[key] = self._apply_random_compression(results[key]) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(params={self.params}, keys={self.keys})') return repr_str
allowed_degradations = { 'RandomBlur': RandomBlur, 'RandomResize': RandomResize, 'RandomNoise': RandomNoise, 'RandomJPEGCompression': RandomJPEGCompression, 'RandomVideoCompression': RandomVideoCompression, }
[docs]@PIPELINES.register_module() class DegradationsWithShuffle: """Apply random degradations to input, with degradations being shuffled. Degradation groups are supported. The order of degradations within the same group is preserved. For example, if we have degradations = [a, b, [c, d]] and shuffle_idx = None, then the possible orders are :: [a, b, [c, d]] [a, [c, d], b] [b, a, [c, d]] [b, [c, d], a] [[c, d], a, b] [[c, d], b, a] Modified keys are the attributed specified in "keys". Args: degradations (list[dict]): The list of degradations. keys (list[str]): A list specifying the keys whose values are modified. shuffle_idx (list | None, optional): The degradations corresponding to these indices are shuffled. If None, all degradations are shuffled. """ def __init__(self, degradations, keys, shuffle_idx=None): self.keys = keys self.degradations = self._build_degradations(degradations) if shuffle_idx is None: self.shuffle_idx = list(range(0, len(degradations))) else: self.shuffle_idx = shuffle_idx def _build_degradations(self, degradations): for i, degradation in enumerate(degradations): if isinstance(degradation, (list, tuple)): degradations[i] = self._build_degradations(degradation) else: degradation_ = allowed_degradations[degradation['type']] degradations[i] = degradation_(degradation['params'], self.keys) return degradations def __call__(self, results): # shuffle degradations if len(self.shuffle_idx) > 0: shuffle_list = [self.degradations[i] for i in self.shuffle_idx] np.random.shuffle(shuffle_list) for i, idx in enumerate(self.shuffle_idx): self.degradations[idx] = shuffle_list[i] # apply degradations to input for degradation in self.degradations: if isinstance(degradation, (tuple, list)): for subdegrdation in degradation: results = subdegrdation(results) else: results = degradation(results) return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(degradations={self.degradations}, ' f'keys={self.keys}, ' f'shuffle_idx={self.shuffle_idx})') return repr_str
Read the Docs v: latest
Versions
latest
stable
1.x
v0.16.0
v0.15.2
v0.15.1
v0.15.0
v0.14.0
v0.13.0
v0.12.0
dev-1.x
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.