Source code for tom_dataservices.dataservices

from abc import ABC, abstractmethod
import logging
from typing import List, Dict, Optional
from urllib.parse import urlencode

from django.conf import settings
from django.apps import apps
from django.utils.module_loading import import_string
from django.db import IntegrityError
from django.core.exceptions import ValidationError
from django.utils.safestring import mark_safe
from django.contrib import messages
from django.urls import reverse

from guardian.shortcuts import assign_perm

from tom_targets.models import TargetName, Target
from tom_targets.base_models import get_target_model_app_label

logger = logging.getLogger(__name__)


[docs] def get_data_service_classes(): """ Imports the Dataservice class from relevant apps and generates a list of data service names. Each dataservice class should be contained in a list of dictionaries in an app's apps.py `dataservices` method. Each dataservice dictionary should contain a 'class' key with the dot separated path to the dataservice class (typically an extension of DataService). FOR EXAMPLE: [{'class': 'path.to.dataservice.class'}] """ data_service_choices = {} for app in apps.get_app_configs(): try: data_services = app.data_services() except AttributeError: continue if data_services: for data_service in data_services: try: clazz = import_string(data_service['class']) except ImportError as e: logger.warning(f'WARNING: Could not import data service class for {app.name} from ' f'{data_service["class"]}.\n' f'{e}') continue data_service_choices[clazz.name] = clazz return data_service_choices
[docs] def get_data_service_class(name): """ Gets the specific dataservice class for a given dataservice name. :returns: Broker class :rtype: class """ available_classes = get_data_service_classes() try: return available_classes[name] except KeyError: raise ImportError( f'''Could not a find a data Service with named {name}. Did you install the app?''' )
[docs] class MissingDataException(Exception): pass
[docs] class NotConfiguredError(Exception): pass
[docs] class QueryServiceError(Exception): """ Represents a higher level error when an underlying service or client library fails. """ pass
[docs] class DataService(ABC): """ Base class for all Data Services. Data Services are classes that are responsible for querying external services and returning data. """ # Recognizable name for the DataService (Gaia, TNS, etc) name = 'BaseDataService' # Full name for the DataService (Hermes Messaging Service, Pan-STARRS1 DR2 Query Service, etc.) verbose_name = None # Url for more info about the DataService info_url = None # Base url for the DataService base_url = None # Notes or limitations on using the DataService service_notes = None # The path to a specialized table partial for displaying query results query_results_table = None # App Version app_version = None # Link to app github repo app_link = None def __init__(self, query_parameters=None, *args, **kwargs): super().__init__(*args, **kwargs) # Instance variable that can store target query results if necessary self.target_results = {} # Instance variable that can store photometry query results if necessary self.photometry_results = {} # Instance variable that can store query results if necessary self.query_results = {} # Instance variable that can store query parameters if necessary self.query_parameters = query_parameters or {}
[docs] @abstractmethod def query_service(self, query_parameters, **kwargs): """Takes in the serialized data from the query form and actually submits the query to the service"""
[docs] def pre_query_validation(self, query_parameters): """Same thing as query_service, but a dry run""" raise NotImplementedError(f'pre_query_validation method has not been implemented for {self.name}')
[docs] def build_query_parameters(self, parameters, **kwargs): """Builds the query parameters from the form data""" raise NotImplementedError(f'build_query_parameters method has not been implemented for {self.name}')
# Include this method if you wish for the TOM to be able to query data for an individual Target. # def build_query_parameters_from_target(self, target, **kwargs): # """ # This is a method that builds query parameters based on an existing target object that will be recognized by # `query_service()`. # This can be done by either by re-creating the form fields set by the Data Service Form and then calling # `self.build_query_parameters()` with the results, or we can reproduce a limited set of parameters uniquely for # a target query. # :param target: A target object to be queried # :return: query_parameters (usually a dict) that can be understood by `query_service()` # """ # raise NotImplementedError('build_query_parameters_from_target method has not been implemented' + # f'for {self.name}.' # )
[docs] def build_headers(self, *args, **kwargs): """Builds the headers for the query""" return {}
[docs] @classmethod def get_form_class(cls): """Returns the full form class for querying this service""" raise NotImplementedError(f"No Form Class for {cls.name} Data Service")
[docs] @classmethod def configuration(cls) -> dict: """Returns the configuration dictionary for this service""" try: return settings.DATA_SERVICES[cls.name] except AttributeError as e: raise NotConfiguredError(e) except KeyError as e: raise NotConfiguredError( f"""The {e} DataService is not configured. </br> Please see the <a href="{cls.info_url}" target="_blank">documentation</a> for more information. """ )
[docs] @classmethod def get_configuration(cls, config_type=None, value=None, **kwargs): """ Get all of the configuration or specific configuration values associated with this dataservice. :Syntax: get_configuration([config_type], [value]) :param config_type: The type of configuration to return. If None, returns all configurations. :param value: The default value to return if configuration not found. :return: A list of available configurations, or a requested configuration, or if not found, the default value. """ try: data_service_config = cls.configuration() if config_type: return data_service_config.get(config_type, value) return [*data_service_config] except NotConfiguredError as e: if value: return value raise NotConfiguredError(e)
[docs] @classmethod def get_credentials(cls, **kwargs): """Returns the credentials for this service. Checks the configuration for an api_key by default.""" return cls.get_configuration('api_key')
[docs] @classmethod def urls(cls, **kwargs) -> dict: """Dictionary of URLS for the DataService""" return {'base_url': cls.base_url, 'info_url': cls.info_url}
[docs] @classmethod def get_urls(cls, url_type=None, value=None, **kwargs): """ Get all urls or a specific url associated with the dataservice. :Syntax: get_urls([url_type], [value]) :param url_type: The type of URL to return. If None, returns all available url types. :param value: The default value to return if the requested url is not found. :return: A list of available uls, or a requested url, or if not found, the default value. """ urls = cls.urls() if url_type: return urls.get(url_type, value) return [*urls]
[docs] def get_additional_context_data(self): """ Called by the View.get_context_data() and adds DataService context to the View’s context dictionary """ return {}
[docs] def get_success_message(self, **kwargs): """Returns a success message to display in the UI after making the query.""" return "Query completed successfully."
[docs] def query_forced_photometry(self, query_parameters, **kwargs): """Set up and run a specialized query for a DataService’s forced photometry service.""" raise NotImplementedError(f'query_forced_photometry method has not been implemented for {self.name}')
[docs] def query_photometry(self, query_parameters, **kwargs): """Set up and run a specialized query for a DataService’s photometry service.""" raise NotImplementedError(f'query_photometry method has not been implemented for {self.name}')
[docs] def query_spectroscopy(self, query_parameters, **kwargs): """Set up and run a specialized query for a DataService’s spectroscopy service.""" raise NotImplementedError(f'query_spectroscopy method has not been implemented for {self.name}')
[docs] def query_reduced_data(self, target, **kwargs): """Set up and run a specialized query to retrieve Reduced Datums from a Data Service""" build_query_parameters_from_target_method = getattr(self, 'build_query_parameters_from_target', None) if build_query_parameters_from_target_method: query_parameters = build_query_parameters_from_target_method(target) else: query_parameters = {} try: phot_results = self.query_photometry(query_parameters, **kwargs) except NotImplementedError: phot_results = [] try: spec_results = self.query_spectroscopy(query_parameters, **kwargs) except NotImplementedError: spec_results = [] # Disable Forced Photometry from here. It's a more expensive process and should be handled differently. # try: # forced_phot_results = self.query_forced_photometry(query_parameters, **kwargs) # except NotImplementedError: # forced_phot_results = [] return {'photometry': phot_results, 'spectroscopy': spec_results, # 'forced_photometry': forced_phot_results }
[docs] def query_aliases(self, query_parameters=None, target=None, **kwargs) -> List: """ Set up and run a specialized query for retrieving target names from a DataService. This method will usually call `query_service()` and translate the results from the dataservice into a list of target names. :param query_parameters: This is the output from build_query_parameters() :return: A list of target names :rtype: List """ return []
[docs] def query_targets(self, query_parameters, **kwargs) -> List[dict]: """ Set up and run a specialized query for retrieving targets from a DataService. This method will usually call `query_service()` and translate the results from the dataservice into a list of dictionaries describing the returned targets. :param query_parameters: This is the output from build_query_parameters() :return: A list of dictionaries describing the resulting targets. Include 'reduced_datums' and/or 'aliases' as keys in this dictionary to add associated data and alternate names without performing additional queries. :rtype: List[dict] """ return [{}]
[docs] def to_data_product(self, query_results=None, **kwargs): """ Upper level function to create a new DataProduct from the query results Can take either new query results, or use stored results form a recent `query_service()` :param query_results: Query results from the DataService :returns: DataProduct object """ query_results = query_results or self.query_results if not query_results: raise MissingDataException('No query results. Did you call query_service()?') else: return self.create_data_product_from_query(query_results, **kwargs)
[docs] def create_data_product_from_query(self, query_results=None, **kwargs): """Create a new DataProduct from the query results""" raise NotImplementedError(f'create_data_product_from_query method has not been implemented for {self.name}')
[docs] def to_reduced_datums(self, target, data_results=None, **kwargs): """ Upper level function to create a new ReducedDatum from the query results This method is not intended to be extended. This method passes the output of query_reduced_data() to create_reduced_datums_from_query() :param target: Target object to associate with the ReducedDatum :param data_results: Query results from the DataService storing observation data. This should be a dictionary with each key being a data_type (i.e. Photometry, Spectroscopy, etc.) """ if not data_results: raise MissingDataException('No Reduced Data dictionary found.') reduced_datum_list = [] for key in data_results.keys(): # If data exists for a given data_type, create ReducedDatums. if data_results[key]: reduced_datum_list += self.create_reduced_datums_from_query(target, data_results[key], key, **kwargs) return reduced_datum_list
[docs] def create_reduced_datums_from_query(self, target, data: Dict, data_type: Optional[str] = None, **kwargs) -> List: """ Create and save new reduced_datums of the appropriate data_type from the query results Be sure to use `XXXXXReducedDatum.objects.get_or_create()` when creating new objects. NOTE: Setting `XXXXXReducedDatum.source` to the the `DataService.name` will allow for automated data updates. :param target: Target Object to be associated with the reduced data :param data: List of data dictionaries of the appropriate `data_type` :param data_type: An appropriate data type as listed in tom_dataproducts.models.DATA_TYPE_CHOICES :return: List of Reduced datums (either retrieved or created) """ raise NotImplementedError(f'create_reduced_datums_from_query method has not been implemented for {self.name}')
[docs] def to_target(self, target_result=None, **kwargs): """ Upper level function to create a new target from the query results This method is not intended to be extended. This method passes a single instance of the output of query_targets() to create_target_from_query(), create_target_extras_from_query() and create_aliases_from_query(). Intended usage: Call to_target on each element of the target_data list of dictionaries from query_target. (see views.py::CreateTargetFromQueryView) :param target_results: Dictionary containing target information. :returns: Target object """ if not target_result: raise MissingDataException('No query results. Did you call query_service()?') else: target = self.create_target_from_query(target_result, **kwargs) extras = self.create_target_extras_from_query(target_result, **kwargs) request = kwargs.get('request') try: target.save(extras=extras) # Give the user access to the target they created if request: target.give_user_access(request.user) target_app_label = get_target_model_app_label() for group in request.user.groups.all(): assign_perm(f'{target_app_label}.view_target', group, target) assign_perm(f'{target_app_label}.change_target', group, target) assign_perm(f'{target_app_label}.delete_target', group, target) except IntegrityError: target = Target.objects.get(name=target.name) if request: message = f"""The target, <a href="{reverse('targets:detail', kwargs={'pk': target.id})}"> {target.name}</a> already exists, any new data has been ingested. You can <a href="{reverse('targets:create') + '?' + urlencode(target.as_dict())}">create</a> a new target anyway. """ messages.warning(request, mark_safe(message)) else: logger.warning(f"The target, {target.name}, already exists. Any new data will be ingested.") # Save Aliases self.to_aliases(target, target_result.get('aliases', [])) return target
[docs] def create_target_from_query(self, target_result, **kwargs): """Create a new target from a single instance of the target results. :param target_result: dictionary describing target details based on query result :returns: target object :rtype: `Target` """ raise NotImplementedError(f'create_target_from_query method has not been implemented for {self.name}')
[docs] def create_target_extras_from_query(self, query_results, **kwargs): """Create a new target from the query results :returns: dict of extras to be added to a new Target :rtype: `dict` """ return {}
[docs] def to_aliases(self, target, alias_results: List, **kwargs) -> List: """ Upper level function to create a new aliases from the query results This method is not intended to be extended. This method passes a list of aliases (either the output of query_aliases() or from target_result['aliases] in to_targets to create_aliases_from_query(). :param target: Target object to associate with the alias :param alias_results: list of aliases from the DataService. This should be a list of names. """ new_aliases = self.create_aliases_from_query(alias_results, **kwargs) for alias in new_aliases: alias.target = target try: alias.full_clean() alias.save() except ValidationError: pass return new_aliases
[docs] def create_aliases_from_query(self, alias_results: List, **kwargs) -> List: """Create a new target name from the query results This method should be over ridden with a method that creates a list of TargetName objects: `TargetName(name=alias)` that will be saved as part of the `Target.save(extras=extras, names=aliases)` call. :param query_result: list of dictionaries describing target details based on query result :returns: list of TargetName objects to be added to a new Target :rtype: `list` """ aliases = [] for alias in alias_results: aliases.append(TargetName(name=alias)) return aliases