Source code for als.stack

"""
Provides image stacking features
"""
# ALS - Astro Live Stacker
# Copyright (C) 2019  Sébastien Durand (Dragonlost) - Gilles Le Maréchal (Gehelem)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
import logging
from multiprocessing import Process, Manager

import astroalign as al
import numpy as np
from PyQt5.QtCore import pyqtSignal
from skimage.transform import SimilarityTransform

from als.code_utilities import log, Timer
from als.model.base import Image
from als.model.data import STACKING_MODE_SUM, STACKING_MODE_MEAN
from als.processing import QueueConsumer

_LOGGER = logging.getLogger(__name__)

_MINIMUM_MATCHES_FOR_VALID_TRANSFORM = 25


[docs]class StackingError(Exception): """ Base class for stacking errors """
# pylint: disable=R0902
[docs]class Stacker(QueueConsumer): """ Responsible of image stacking : alignment and registration """ stack_size_changed_signal = pyqtSignal(int) """Qt signal emitted when stack size changed""" @log def __init__(self, stack_queue): QueueConsumer.__init__(self, "stack", stack_queue) self._size: int = 0 self._last_stacking_result: Image = None self._align_reference: Image = None self._stacking_mode = STACKING_MODE_MEAN self._align_before_stack = True @property @log def align_before_stack(self) -> bool: """ Gets "align before stack" switch :return: Do we align before stacking ? :rtype: bool """ return self._align_before_stack @align_before_stack.setter @log def align_before_stack(self, align: bool): """ Sets "align before stack" switch :param align: Do we align before stacking ? :type align: bool """ self._align_before_stack = align @property @log def stacking_mode(self) -> str: """ Gets current stacking mode :return: the stacking mode :rtype: str """ return self._stacking_mode @stacking_mode.setter @log def stacking_mode(self, mode: str): """ Sets current stacking mode :param mode: stacking mode :type mode: str """ self._stacking_mode = mode
[docs] @log def reset(self): """ Reset stacker to its starting state : No reference, no result and counter = 0. """ self._size = 0 self._last_stacking_result = None self._align_reference = None self.stack_size_changed_signal.emit(self.size)
@log def _publish_stacking_result(self, image: Image): """ Record a new stacking result :param image: new stacking result :type image: Image """ self._last_stacking_result = image self.size += 1 self.new_result_signal.emit(image) @property @log def size(self): """ Retrieves the number of stacked images since last reset :return: how many images did we stack :rtype: int """ return self._size @size.setter @log def size(self, size): """ Sets stack size :param size: the size :type size: int """ self._size = size self.stack_size_changed_signal.emit(self.size) @log def _handle_image(self, image: Image): if self.size == 0: _LOGGER.debug("This is the first image for this stack. Publishing right away") self._publish_stacking_result(image) self._align_reference = image else: try: if not image.is_same_shape_as(self._last_stacking_result): raise StackingError( "Image dimensions or color don't match stack content. " f"New image shape : {image.data.shape} <=> " f"Reference shape : {self._last_stacking_result.data.shape}" ) try: if self._align_before_stack: # alignment is a memory greedy process, we take special care of such errors try: self._align_image(image) except OSError as os_error: raise StackingError(os_error) self._stack_image(image) except AttributeError: raise StackingError("Our reference images are gone.") self._publish_stacking_result(image) except StackingError as stacking_error: _LOGGER.warning(f"Could not stack image {image.origin} : {stacking_error}. Image is DISCARDED") @log def _align_image(self, image): """ align image with the current align reference The image data is modified in place by this function :param image: the image to be aligned :type image: Image """ with Timer() as find_timer: transformation = self._find_transformation(image) _LOGGER.debug(f"Found transformation for alignment of {image.origin} in " f"{find_timer.elapsed_in_milli_as_str} ms") with Timer() as apply_timer: self._apply_transformation(image, transformation) _LOGGER.debug(f"Applied transformation for alignment of {image.origin} in " f"{apply_timer.elapsed_in_milli_as_str} ms") @log def _apply_transformation(self, image: Image, transformation: SimilarityTransform): """ Apply a transformation to an image. If image is color, channels are processed using multiprocessing, allowing global operation to take less time on a multi core CPU Image is modified in place by this function :param image: the image to apply transformation to :type image: Image :param transformation: the transformation to apply :type transformation: skimage.transform._geometric.SimilarityTransform """ if image.is_color(): _LOGGER.debug(f"Aligning color image...") manager = Manager() results_dict = manager.dict() channel_processors = [] for channel in range(3): processor = Process(target=Stacker._apply_single_channel_transformation, args=[image, self._last_stacking_result, transformation, results_dict, channel]) processor.start() channel_processors.append(processor) for processor in channel_processors: processor.join() _LOGGER.debug("Color channel processes are done. Fetching results and storing results...") for channel, data in results_dict.items(): image.data[channel] = data _LOGGER.debug(f"Aligning color image DONE") else: _LOGGER.debug(f"Aligning b&w image...") result_dict = dict() Stacker._apply_single_channel_transformation( image, self._last_stacking_result, transformation, result_dict ) image.data = result_dict[0] _LOGGER.debug(f"Aligning b&w image : DONE") @staticmethod def _apply_single_channel_transformation(image, reference, transformation, results_dict, channel=None): """ apply a transformation on a specific channel (RGB) of a color image, or whole data of a b&w image. :param image: the image to apply transformation to :type image: Image :param reference: the align reference image :type reference: Image :param transformation: the transformation to apply :type transformation: skimage.transform._geometric.SimilarityTransform :param results_dict: the dict into which transformation result is to be stored. dict key is the channel number for a color image, or 0 for a b&w image :type results_dict: dict :param channel: the 0 indexed number of the color channel to process (0=red, 1=green, 2=blue) :type channel: int """ if channel is not None: target_index = channel source_data = image.data[channel] reference_data = reference.data[channel] else: target_index = 0 source_data = image.data reference_data = reference.data results_dict[target_index] = np.float32(al.apply_transform(transformation, source_data, reference_data)) @log def _find_transformation(self, image: Image): """ Iteratively try and find a valid transformation to align image with stored align reference. We perform 3 tries with growing image sizes of a centered image subset : 10%, 30% and 100% of image size :param image: the image to be aligned :type image: Image :return: the found transformation :raises: StackingError when no transformation is found using the whole image """ for ratio in [.1, .33, 1.]: top, bottom, left, right = self._get_image_subset_boundaries(ratio) # pick green channel if image has color if image.is_color(): new_subset = image.data[1][top:bottom, left:right] ref_subset = self._align_reference.data[1][top:bottom, left:right] else: new_subset = image.data[top:bottom, left:right] ref_subset = self._align_reference.data[top:bottom, left:right] try: _LOGGER.debug(f"Searching valid transformation on subset " f"with ratio:{ratio} and shape: {new_subset.shape}") transformation, matches = al.find_transform(new_subset, ref_subset) _LOGGER.debug(f"Found transformation with subset ratio = {ratio}") _LOGGER.debug(f"rotation : {transformation.rotation}") _LOGGER.debug(f"translation : {transformation.translation}") _LOGGER.debug(f"scale : {transformation.scale}") matches_count = len(matches[0]) _LOGGER.debug(f"image matched features count : {matches_count}") if matches_count < _MINIMUM_MATCHES_FOR_VALID_TRANSFORM: _LOGGER.debug(f"Found transformation but matches count is too low : " f"{matches_count} < {_MINIMUM_MATCHES_FOR_VALID_TRANSFORM}. " "Discarding transformation") raise StackingError("Too few matches") return transformation # pylint: disable=W0703 except Exception as alignment_error: # we have no choice but catching Exception, here. That's what AstroAlign raises in some cases # this will catch MaxIterError as well... if ratio == 1.: raise StackingError(alignment_error) _LOGGER.debug(f"Could not find valid transformation on subset with ratio = {ratio}.") continue @log def _get_image_subset_boundaries(self, ratio: float): """ Retrieves a tuple of 4 int values representing the limits of a centered box (a.k.a. subset) as big as ratio * stored stacking result's size :param ratio: size ratio of subset vs stacking result :type ratio: float :return: a tuple of 4 int for top, bottom, left, right :rtype: tuple """ width = self._last_stacking_result.width height = self._last_stacking_result.height horizontal_margin = int((width - (width * ratio)) / 2) vertical_margin = int((height - (height * ratio)) / 2) left = 0 + horizontal_margin right = width - horizontal_margin - 1 top = 0 + vertical_margin bottom = height - vertical_margin - 1 return top, bottom, left, right @log def _stack_image(self, image: Image): """ Compute stacking according to user defined stacking mode the image data is modified in place by this function :param image: the image to be stacked :type image: Image """ _LOGGER.debug(f"Stacking in {self._stacking_mode} mode...") if self._stacking_mode == STACKING_MODE_SUM: image.data = image.data + self._last_stacking_result.data elif self._stacking_mode == STACKING_MODE_MEAN: image.data = (self.size * self._last_stacking_result.data + image.data) / (self.size + 1) else: raise StackingError(f"Unsupported stacking mode : {self._stacking_mode}") _LOGGER.debug(f"Stacking in {self._stacking_mode} done.")