Source code for als.logic

# !/usr/bin/python3
# -*- coding: utf-8 -*-

# ALS - Astro Live Stacker
# Copyright (C) 2019  Sébastien Durand (Dragonlost) - Gilles Le Maréchal (Gehelem)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Module holding all application logic
"""
import gettext
import logging
import os
import shutil
from datetime import datetime
from pathlib import Path
from typing import List

import als.model.data
from als import config
from als.code_utilities import log, AlsException, SignalingQueue
from als.crunching import compute_histograms_for_display
from als.io.input import InputScanner, ScannerStartError
from als.io.network import get_ip, WebServer
from als.io.output import ImageSaver
from als.model.base import Image, Session
from als.model.data import STACKING_MODE_MEAN, DYNAMIC_DATA, WORKER_STATUS_BUSY, WORKER_STATUS_IDLE
from als.model.params import ProcessingParameter
from als.processing import Pipeline, Debayer, Standardize, ConvertForOutput, Levels, ColorBalance, AutoStretch
from als.stack import Stacker

gettext.install('als', 'locale')

_LOGGER = logging.getLogger(__name__)


[docs]class SessionError(AlsException): """ Class for all errors related to session management """
[docs]class CriticalFolderMissing(SessionError): """Raised when a critical folder is missing"""
[docs]class WebServerStartFailure(AlsException): """Raised when web server fails"""
# pylint: disable=R0902, R0904
[docs]class Controller: """ The application controller, in charge of implementing application logic """ _BIN_COUNT = 512 @log def __init__(self): DYNAMIC_DATA.session.set_status(Session.stopped) DYNAMIC_DATA.web_server_is_running = False self._save_every_image = False DYNAMIC_DATA.pre_processor_status = WORKER_STATUS_IDLE DYNAMIC_DATA.stacker_status = WORKER_STATUS_IDLE DYNAMIC_DATA.post_processor_status = WORKER_STATUS_IDLE DYNAMIC_DATA.saver_status = WORKER_STATUS_IDLE self._input_scanner: InputScanner = InputScanner.create_scanner() self._pre_process_queue: SignalingQueue = DYNAMIC_DATA.pre_process_queue self._pre_process_pipeline: Pipeline = Pipeline( 'pre-process', self._pre_process_queue, [Debayer(), Standardize()]) self._pre_process_pipeline.start() self._stacker_queue: SignalingQueue = DYNAMIC_DATA.stacker_queue self._stacker: Stacker = Stacker(self._stacker_queue) self._stacker.stacking_mode = STACKING_MODE_MEAN self._stacker.align_before_stack = True self._stacker.start() self._post_process_queue = DYNAMIC_DATA.process_queue self._post_process_pipeline: Pipeline = Pipeline('post-process', self._post_process_queue, [ConvertForOutput()]) self._rgb_processor = ColorBalance() self._autostretch_processor = AutoStretch() self._levels_processor = Levels() self._post_process_pipeline.add_process(self._autostretch_processor) self._post_process_pipeline.add_process(self._levels_processor) self._post_process_pipeline.add_process(self._rgb_processor) self._post_process_pipeline.start() self._saver_queue = DYNAMIC_DATA.save_queue self._saver = ImageSaver(self._saver_queue) self._saver.start() self._last_stacking_result = None self._web_server = None self._model_observers = list() self._input_scanner.new_image_signal[Image].connect(self.on_new_image_read) self._pre_process_pipeline.new_result_signal[Image].connect(self.on_new_pre_processed_image) self._stacker.stack_size_changed_signal[int].connect(self.on_stack_size_changed) self._stacker.new_result_signal[Image].connect(self.on_new_stack_result) self._post_process_pipeline.new_result_signal[Image].connect(self.on_new_post_processor_result) self._pre_process_queue.size_changed_signal[int].connect(self.on_pre_process_queue_size_changed) self._stacker_queue.size_changed_signal[int].connect(self.on_stacker_queue_size_changed) self._post_process_queue.size_changed_signal[int].connect(self.on_post_processor_queue_size_changed) self._saver_queue.size_changed_signal[int].connect(self.on_saver_queue_size_changed) self._pre_process_pipeline.busy_signal.connect(self.on_pre_processor_busy) self._pre_process_pipeline.waiting_signal.connect(self.on_pre_processor_waiting) self._stacker.busy_signal.connect(self.on_stacker_busy) self._stacker.waiting_signal.connect(self.on_stacker_waiting) self._post_process_pipeline.busy_signal.connect(self.on_post_processor_busy) self._post_process_pipeline.waiting_signal.connect(self.on_post_processor_waiting) self._saver.busy_signal.connect(self.on_saver_busy) self._saver.waiting_signal.connect(self.on_saver_waiting) DYNAMIC_DATA.session.status_changed_signal.connect(self._notify_model_observers)
[docs] @log def get_autostretch_parameters(self) -> List[ProcessingParameter]: """ Retrieves autostretch parameters :return: autostretch parameters """ return self._autostretch_processor.get_parameters()
[docs] @log def get_rgb_parameters(self) -> List[ProcessingParameter]: """ Retrieves rgb parameters :return: rgb parameters """ return self._rgb_processor.get_parameters()
[docs] @log def get_levels_parameters(self) -> List[ProcessingParameter]: """ Retrieves Levels processor parameters :return: Levels processor parameters """ return self._levels_processor.get_parameters()
[docs] @log def remove_model_observer(self, observer): """ Removes observer from our observers list. :param observer: the observer to remove :type observer: any """ if observer in self._model_observers: self._model_observers.remove(observer)
@log def _notify_model_observers(self, image_only=False): """ Tells all registered observers to update their display """ for observer in self._model_observers: observer.update_display(image_only)
[docs] @log def add_model_observer(self, observer): """ Adds an observer to our observers list. :param observer: the new observer :type observer: any """ self._model_observers.append(observer)
[docs] @log def apply_processing(self): """ Apply processing on last stacking result """ if self._stacker.size > 0 and DYNAMIC_DATA.process_queue.qsize() == 0: DYNAMIC_DATA.process_queue.put(self._last_stacking_result.clone())
[docs] @log def get_save_every_image(self) -> bool: """ Retrieves the flag that tells if we need to save every process result image :return: the flag that tells if we need to save every process result image :rtype: bool """ return self._save_every_image
[docs] @log def set_save_every_image(self, save_every_image: bool): """ Sets the flag that tells if we need to save every process result image :param save_every_image: flag that tells if we need to save every process result image :type save_every_image: bool """ self._save_every_image = save_every_image
[docs] @log def get_align_before_stack(self) -> bool: """ Gets "align before stack" switch :return: Do we align before stacking ? :rtype: bool """ return self._stacker.align_before_stack
[docs] @log def set_align_before_stack(self, align: bool): """ Sets "align before stack" switch :param align: Do we align before stacking ? :type align: bool """ self._stacker.align_before_stack = align
[docs] @log def get_stacking_mode(self): """ Gets current stacking mode :return: the stacking mode :rtype: str """ return self._stacker.stacking_mode
[docs] @log def set_stacking_mode(self, mode): """ Sets current stacking mode :param mode: stacking mode :type mode: str """ self._stacker.stacking_mode = mode
[docs] @log def on_stack_size_changed(self, size): """ Stack size just changed :param size: the stack size :type size: int """ DYNAMIC_DATA.stack_size = size self._notify_model_observers()
[docs] @log def on_new_post_processor_result(self, image: Image): """ A new image processing result is here :param image: the new processing result :type image: Image """ image.origin = "Process result" DYNAMIC_DATA.histogram_container = compute_histograms_for_display(image, Controller._BIN_COUNT) DYNAMIC_DATA.post_processor_result = image self._notify_model_observers(image_only=True) self.save_post_process_result()
[docs] @log def on_new_stack_result(self, image: Image): """ A new image has been stacked :param image: the result of the stack :type image: Image """ image.origin = "Stacking result" self._last_stacking_result = image.clone() self.purge_queue(self._post_process_queue) self._post_process_queue.put(image.clone())
[docs] @log def on_new_image_read(self, image: Image): """ A new image as been read by input scanner :param image: the new image :type image: Image """ self._pre_process_queue.put(image)
[docs] @log def on_new_pre_processed_image(self, image: Image): """ A new image as been pre-processed :param image: the image :type image: Image """ self._stacker_queue.put(image)
[docs] @log def on_pre_process_queue_size_changed(self, new_size): """ Qt slot executed when an item has just been pushed to the pre-processor queue :param new_size: new queue size :type new_size: int """ _LOGGER.debug(f"New pre-processor queue size : {new_size}") self._notify_model_observers()
[docs] @log def on_stacker_queue_size_changed(self, new_size): """ Qt slot executed when an item has just been pushed to the stacker queue :param new_size: new queue size :type new_size: int """ _LOGGER.debug(f"New stacker queue size : {new_size}") self._notify_model_observers()
[docs] @log def on_post_processor_queue_size_changed(self, new_size): """ Qt slot executed when an item has just been pushed to the process queue :param new_size: new queue size :type new_size: int """ _LOGGER.debug(f"New post-processor queue size : {new_size}") self._notify_model_observers()
[docs] @log def on_saver_queue_size_changed(self, new_size): """ Qt slot executed when an item has just been pushed to the save queue :param new_size: new queue size :type new_size: int """ _LOGGER.debug(f"New saver queue size : {new_size}") self._notify_model_observers()
[docs] @log def on_pre_processor_busy(self): """ pre-processor just started working on new image """ DYNAMIC_DATA.pre_processor_status = WORKER_STATUS_BUSY self._notify_model_observers()
[docs] @log def on_pre_processor_waiting(self): """ pre-processor just finished working on new image """ DYNAMIC_DATA.pre_processor_status = WORKER_STATUS_IDLE self._notify_model_observers()
[docs] @log def on_stacker_busy(self): """ stacker just started working on new image """ DYNAMIC_DATA.stacker_status = WORKER_STATUS_BUSY self._notify_model_observers()
[docs] @log def on_stacker_waiting(self): """ stacker just finished working on new image """ DYNAMIC_DATA.stacker_status = WORKER_STATUS_IDLE self._notify_model_observers()
[docs] @log def on_post_processor_busy(self): """ post-processor just started working on new image """ DYNAMIC_DATA.post_processor_status = WORKER_STATUS_BUSY self._notify_model_observers()
[docs] @log def on_post_processor_waiting(self): """ post-processor just finished working on new image """ DYNAMIC_DATA.post_processor_status = WORKER_STATUS_IDLE self._notify_model_observers()
[docs] @log def on_saver_busy(self): """ saver just started working on new image """ DYNAMIC_DATA.saver_status = WORKER_STATUS_BUSY self._notify_model_observers()
[docs] @log def on_saver_waiting(self): """ saver just finished working on new image """ DYNAMIC_DATA.saver_status = WORKER_STATUS_IDLE self._notify_model_observers()
[docs] @log def start_session(self): """ Starts session """ try: if DYNAMIC_DATA.session.is_stopped: _LOGGER.info("Starting new session...") self._stacker.reset() folders_dict = { "scan": config.get_scan_folder_path(), "work": config.get_work_folder_path() } # checking presence of both scan & work folders for role, path in folders_dict.items(): if not Path(path).is_dir(): title = "Missing critical folder" message = f"Your currently configured {role} folder '{path}' is missing." raise CriticalFolderMissing(title, message) else: # session was paused when this start was ordered. No need for checks & setup _LOGGER.info("Restarting input scanner ...") # start input scanner try: self._input_scanner.start() _LOGGER.info("Input scanner started") except ScannerStartError as scanner_start_error: raise SessionError("Input scanner could not start", scanner_start_error) running_mode = f"{self._stacker.stacking_mode}" running_mode += " with alignment" if self._stacker.align_before_stack else " without alignment" _LOGGER.info(f"Session running in mode {running_mode}") DYNAMIC_DATA.session.set_status(Session.running) except SessionError as session_error: _LOGGER.error(f"Session error. {session_error.message} : {session_error.details}") raise
[docs] @log def stop_session(self): """ Stops session : stop input scanner and purge input queue """ if not DYNAMIC_DATA.session.is_stopped: self._stop_input_scanner() Controller.purge_queue(self._pre_process_queue) Controller.purge_queue(self._stacker_queue) Controller.purge_queue(self._post_process_queue) _LOGGER.info("Session stopped") DYNAMIC_DATA.session.set_status(Session.stopped)
[docs] @log def pause_session(self): """ Pauses session : just stop input scanner """ if DYNAMIC_DATA.session.is_running: self._stop_input_scanner() _LOGGER.info("Session paused") DYNAMIC_DATA.session.set_status(Session.paused)
[docs] @log def start_www(self): """Starts web server""" work_folder = config.get_work_folder_path() ip_address = get_ip() port_number = config.get_www_server_port_number() # setup work folder try: Controller._setup_web_content() except OSError as os_error: raise WebServerStartFailure("Work folder could not be prepared", str(os_error)) try: self._web_server = WebServer(work_folder) self._web_server.start() if ip_address == "127.0.0.1": log_function = _LOGGER.warning else: log_function = _LOGGER.info url = f"http://{ip_address}:{port_number}" log_function(f"Web server started. Reachable at {url}") DYNAMIC_DATA.web_server_ip = ip_address DYNAMIC_DATA.web_server_is_running = True self._notify_model_observers() except OSError as os_error: title = "Could not start web server" _LOGGER.error(f"{title} : {os_error}") raise WebServerStartFailure(title, str(os_error))
[docs] @log def stop_www(self): """Stops web server""" if self._web_server and DYNAMIC_DATA.web_server_is_running: self._web_server.stop() self._web_server.join() self._web_server = None _LOGGER.info("Web server stopped") DYNAMIC_DATA.web_server_is_running = False self._notify_model_observers()
[docs] @staticmethod @log def purge_queue(queue: SignalingQueue): """ Purge a queue :param queue: the queue to purge :type queue: SignalingQueue """ while not queue.empty(): queue.get()
@staticmethod @log def _setup_web_content(): """Prepares the work folder.""" work_dir_path = config.get_work_folder_path() resources_dir_path = os.path.dirname(os.path.realpath(__file__)) + "/../resources" with open(resources_dir_path + "/index.html", 'r') as file: index_content = file.read() index_content = index_content.replace('##PERIOD##', str(config.get_www_server_refresh_period())) with open(work_dir_path + "/index.html", 'w') as file: file.write(index_content) standby_image_path = work_dir_path + "/" + als.model.data.WEB_SERVED_IMAGE_FILE_NAME_BASE standby_image_path += '.' + als.model.data.IMAGE_SAVE_TYPE_JPEG shutil.copy(resources_dir_path + "/waiting.jpg", standby_image_path)
[docs] @log def save_post_process_result(self): """ Saves stacking result image to disk """ # we save the image no matter what, then save a jpg for the web server if it is running image = DYNAMIC_DATA.post_processor_result self.save_image(image, config.get_image_save_format(), config.get_work_folder_path(), als.model.data.STACKED_IMAGE_FILE_NAME_BASE) if DYNAMIC_DATA.web_server_is_running: self.save_image(image, als.model.data.IMAGE_SAVE_TYPE_JPEG, config.get_work_folder_path(), als.model.data.WEB_SERVED_IMAGE_FILE_NAME_BASE) # if user want to save every image, we save a timestamped version if self._save_every_image: self.save_image(image, config.get_image_save_format(), config.get_work_folder_path(), als.model.data.STACKED_IMAGE_FILE_NAME_BASE, add_timestamp=True)
# pylint: disable=R0913
[docs] @log def save_image(self, image: Image, file_extension: str, dest_folder_path: str, filename_base: str, add_timestamp: bool = False): """ Save an image to disk. :param image: the image to save :type image: Image :param file_extension: The image save file format extension :type file_extension: str :param dest_folder_path: The path of the folder image will be saved to :type dest_folder_path: str :param filename_base: The name of the file to save to (without extension) :type filename_base: str :param add_timestamp: Do we add a timestamp to image name :type add_timestamp: bool """ filename_base = filename_base if add_timestamp: filename_base += '-' + Controller.get_timestamp() image_to_save = image.clone() image_to_save.destination = dest_folder_path + "/" + filename_base + '.' + file_extension self._saver_queue.put(image_to_save)
[docs] @log def shutdown(self): """ Proper shutdown of all app components """ if not DYNAMIC_DATA.session.is_stopped: self.stop_session() if DYNAMIC_DATA.web_server_is_running: self.stop_www() self._pre_process_pipeline.stop() self._stacker.stop() self._post_process_pipeline.stop() self._saver.stop() self._saver.wait()
[docs] @staticmethod @log def get_timestamp(): """ Return a timestamp build from current date and time :return: the timestamp :rtype: str """ timestamp = str(datetime.fromtimestamp(datetime.timestamp(datetime.now()))) timestamp = timestamp.replace(' ', "-").replace(":", '-').replace('.', '-') return timestamp
@log def _stop_input_scanner(self): self._input_scanner.stop() _LOGGER.info("Input scanner stopped")