from io import StringIO
from urllib.parse import urlencode
import logging
from typing import Any, List
from crispy_forms.bootstrap import FormActions
from crispy_forms.layout import HTML, Layout, Submit
from django import forms
from django.core.exceptions import BadRequest
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.management import call_command
from django_filters import CharFilter, ChoiceFilter, DateTimeFromToRangeFilter, ModelMultipleChoiceFilter
from django_filters import OrderingFilter, MultipleChoiceFilter, rest_framework
from django_filters.views import FilterView
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse, reverse_lazy
from django.utils.safestring import mark_safe
from django.views.generic import View, TemplateView
from django.views.generic.detail import DetailView
from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView
from django.views.generic.list import ListView
from guardian.shortcuts import get_objects_for_user, assign_perm
from guardian.mixins import PermissionListMixin
from tom_common.hints import add_hint
from tom_common.mixins import Raise403PermissionRequiredMixin
from tom_dataproducts.forms import AddProductToGroupForm, DataProductUploadForm
from tom_dataproducts.models import is_fits_image_file
from tom_observations.cadence import CadenceForm, get_cadence_strategy
from tom_observations.facility import get_service_class, get_service_classes
from tom_observations.facility import BaseManualObservationFacility
from tom_observations.forms import AddExistingObservationForm, facility_choices
from tom_observations.models import ObservationRecord, ObservationGroup, ObservationTemplate, DynamicCadence
from tom_targets.models import Target
from tom_targets.permissions import targets_for_user
logger = logging.getLogger(__name__)
[docs]
class ObservationFilter(rest_framework.FilterSet):
"""
Defines the available fields for filtering the list of ``ObservationRecord`` objects.
"""
ordering = OrderingFilter(
fields=['scheduled_start', 'scheduled_end', 'status', 'created', 'modified']
)
scheduled_start = DateTimeFromToRangeFilter()
scheduled_end = DateTimeFromToRangeFilter()
target_id = ModelMultipleChoiceFilter(
queryset=Target.objects.filter(observationrecord__isnull=False).distinct().order_by('name')
)
observationgroup = ModelMultipleChoiceFilter(
label='Observation Groups', queryset=ObservationGroup.objects.all()
)
facility = MultipleChoiceFilter(choices=facility_choices())
def __init__(self, *args, **kwargs):
"""
The "status" filter is populated dynamically via list comprehension here in the __init__ (at runtime).
This is important because the `ObservationRecord` db table doesn't necessarily exist at
Class-interpretation-time
"""
super().__init__(*args, **kwargs)
self.status = MultipleChoiceFilter(
choices=[
(s, s) for s in
ObservationRecord.objects.values_list('status', flat=True).order_by('status').distinct()
]
)
class Meta:
model = ObservationRecord
fields = ['ordering', 'observation_id', 'target_id', 'observationgroup', 'facility', 'status']
[docs]
class ObservationListView(FilterView):
"""
View that displays all ``ObservationRecord`` objects.
"""
filterset_class = ObservationFilter
template_name = 'tom_observations/observation_list.html'
paginate_by = 25
model = ObservationRecord
strict = False
[docs]
def get_queryset(self, *args, **kwargs):
"""
Gets the set of ``ObservationRecord`` objects associated with the targets that the user is authorized to view.
:returns: set of ObservationRecords
:rtype: QuerySet
"""
if settings.TARGET_PERMISSIONS_ONLY:
return ObservationRecord.objects.filter(
target__in=targets_for_user(self.request.user, Target.objects.all(), 'view_target')
)
else:
return get_objects_for_user(self.request.user, 'tom_observations.view_observationrecord')
[docs]
def get(self, request, *args, **kwargs):
"""
Handles the GET requests to this view. If update_status is passed in the query parameters, calls the
updatestatus management command to query for new statuses for ``ObservationRecord`` objects.
:param request: request object for this GET request
:type request: HTTPRequest
"""
# QueryDict is immutable, and we want to append the remaining parameters to the redirect URL
query_params = request.GET.copy()
update_status = query_params.pop('update_status', False)
if update_status:
if not request.user.is_authenticated:
return redirect(reverse('login'))
out = StringIO()
call_command('updatestatus', stdout=out)
messages.info(request, out.getvalue())
add_hint(request, mark_safe(
'Did you know updating observation statuses can be automated? Learn how in '
'<a href=https://tom-toolkit.readthedocs.io/en/stable/customization/automation.html>'
'the docs.</a>'))
return redirect(f'{reverse("tom_observations:list")}?{urlencode(query_params)}')
selected = request.GET.getlist('selected')
observationgroups = request.GET.getlist('observationgroup')
action = request.GET.get('action')
if selected and observationgroups and action:
observation_records = ObservationRecord.objects.filter(id__in=selected)
groups = ObservationGroup.objects.filter(id__in=observationgroups)
for group in groups:
if action == 'add':
group.observation_records.add(*observation_records)
if action == 'remove':
group.observation_records.remove(*observation_records)
group.save()
return redirect(reverse('tom_observations:list'))
return super().get(request, *args, **kwargs)
# TODO: Ensure this template includes the ApplyObservationTemplate form at the top
[docs]
class ObservationCreateView(LoginRequiredMixin, FormView):
"""
View for creation/submission of an observation. Requires authentication.
"""
template_name = 'tom_observations/observation_form.html'
[docs]
def dispatch(self, request, *args, **kwargs):
"""Figure out what HTTP method (GET, POST, etc) should be called
to handle this request.
Here, we extend the method to attach the Facility class to the View so
we don't have to create more than one instances of it. So,
instantiate the facility class once and store it on the view instance
for the duration of the request-response cycle.
"""
self.facility_instance = self.get_facility_class()()
# attach the user the Faciliy after instantiation -- not in __init__()
self.facility_instance.set_user(request.user)
# now go find the HTTP method to use...
return super().dispatch(request, *args, **kwargs)
[docs]
def get_template_names(self) -> List[str]:
"""Override the base class method to ask the Facility if it has
specified a Facility-specific template to use. If so, put it at the
front of the returned list of template_names.
"""
template_names = super().get_template_names()
# get the facility_class and its template_name, if defined
try:
if self.facility_instance.template_name:
# add to front of list b/c first template will be tried first
template_names.insert(0, self.facility_instance.template_name)
except AttributeError:
# some Facilities won't have a custom template_name defined and so
# we will just use the one defined above.
pass
logger.debug(f'ObservationCreateView.get_template_name template_names: {template_names}')
return template_names
[docs]
def get_target_id(self):
"""
Parses the target id for the given observation from the query parameters.
:returns: id of the target for observing
:rtype: int
"""
if self.request.method == 'GET':
return self.request.GET.get('target_id')
elif self.request.method == 'POST':
return self.request.POST.get('target_id')
[docs]
def get_target(self):
"""
Gets the target for observing from the database
:returns: target for observing
:rtype: Target
"""
return Target.objects.get(pk=self.get_target_id())
[docs]
def get_facility(self):
"""
Gets the facility from which the target is being observed from the query parameters
:returns: facility name
:rtype: str
"""
return self.kwargs['facility']
[docs]
def get_facility_class(self):
"""
Gets the facility interface class
:returns: facility class name
:rtype: str
"""
return get_service_class(self.get_facility())
def get_cadence_strategy_form(self):
cadence_strategy = self.request.GET.get('cadence_strategy')
if not cadence_strategy:
return CadenceForm
return get_cadence_strategy(cadence_strategy).form
[docs]
def get_context_data(self, **kwargs):
"""
Adds the available observation types for the observing facility to the context object.
:returns: context dictionary
:rtype: dict
"""
context = super(ObservationCreateView, self).get_context_data(**kwargs)
# Populate initial values for each form and add them to the context. If the page
# reloaded due to form errors, only repopulate the form that was submitted.
observation_type_choices = []
initial = self.get_initial()
observation_form_classes = self.facility_instance.get_form_classes_for_display(**kwargs)
for observation_type, observation_form_class in observation_form_classes.items():
form_data = {**initial, **{'observation_type': observation_type}}
# Repopulate the appropriate form with form data if the original submission was invalid
if observation_type == self.request.POST.get('observation_type'):
form_data.update(**self.request.POST.dict())
observation_form_class = type(f'Composite{observation_type}Form',
(self.get_cadence_strategy_form(), observation_form_class), {})
# Pass facility parameter to form instantiation for user context
form_kwargs = {'initial': form_data, 'facility': self.facility_instance}
form_instance = observation_form_class(**form_kwargs)
observation_type_choices.append((observation_type, form_instance))
context['observation_type_choices'] = observation_type_choices
# Ensure correct tab is active if submission is unsuccessful
context['active'] = self.request.POST.get('observation_type')
target = Target.objects.get(pk=self.get_target_id())
context['target'] = target
# allow the Facility class to add data to the context
facility_context = self.facility_instance.get_facility_context_data(target=target)
context.update(facility_context)
context['facility_link'] = getattr(self.facility_instance, 'link', '')
try:
missing_settings = self.facility_instance.facility_settings.get_unconfigured_settings()
context['missing_configurations'] = ", ".join(missing_settings)
except AttributeError:
context['missing_configurations'] = ''
return context
[docs]
def get_initial(self):
"""
Populates the observation form with initial data including the id of the target to be observed, the facility at
which the observation will take place, and the observation type desired.
:returns: initial form data
:rtype: dict
"""
initial = super().get_initial()
if not self.get_target_id():
raise Exception('Must provide target_id')
initial['target_id'] = self.get_target_id()
initial['facility'] = self.get_facility()
initial['request'] = self.request
initial.update(self.request.GET.dict())
return initial
[docs]
def post(self, request, *args, **kwargs):
"""
Handles the POST request to the view.
This method is responsible for processing the form submission. It
instantiates the form with the POST data and files, and then
checks if the form is valid. If the form is valid, it calls
form_valid(); otherwise, it calls form_invalid().
We override this method to handle a TypeError that may occur
when instantiating the form. Some forms may not accept the 'user'
keyword argument. In this case, we catch the TypeError, remove
the 'user' from the keyword arguments, and try to instantiate
the form again.
"""
form_class = self.get_form_class()
form_kwargs = self.get_form_kwargs()
form = form_class(**form_kwargs)
if form.is_valid():
if 'validate' in request.POST:
return self.form_validation_valid(form)
else:
return self.form_valid(form)
else:
return self.form_invalid(form)
def form_validation_valid(self, form):
messages.info(self.request, form.get_validation_message())
return self.render_to_response(self.get_context_data(request=self.request, form=form))
[docs]
class ObservationRedirectView(LoginRequiredMixin, View):
"""
This view redirects the user to an outside facility using the URL
provided by the facility's redirect_url method (must be a RedirectFacility)
"""
def get(self, request, *args, **kwargs):
facility_name = self.kwargs['facility']
facility_instance = get_service_class(facility_name)()
target_id = request.GET.get("target_id")
callback_url = request.build_absolute_uri(
reverse("tom_observations:callback")
) + f"?target_id={target_id}&facility={facility_name}"
return redirect(facility_instance.redirect_url(target_id, callback_url))
[docs]
class ObservationRecordUpdateView(LoginRequiredMixin, UpdateView):
"""
This view allows for the updating of the observation id, which will eventually be expanded to more fields.
"""
model = ObservationRecord
fields = ['observation_id']
template_name = 'tom_observations/observationupdate_form.html'
[docs]
def get_success_url(self):
return reverse('tom_observations:detail', kwargs={'pk': self.get_object().id})
[docs]
class ObservationRecordCancelView(LoginRequiredMixin, View):
def get(self, request, *args, **kwargs):
obsr_id = self.kwargs.get('pk')
obsr = ObservationRecord.objects.get(id=obsr_id)
facility = get_service_class(obsr.facility)()
facility.set_user(request.user)
try:
success = facility.cancel_observation(obsr.observation_id)
if success:
messages.success(self.request, f'Successfully cancelled observation {obsr}')
facility.update_observation_status(obsr.observation_id)
else:
messages.error(self.request, 'Unable to cancel observation.')
except forms.ValidationError as ve:
messages.error(self.request, f'Unable to cancel observation: {ve}')
return redirect(reverse('tom_observations:detail', kwargs={'pk': obsr.id}))
[docs]
class ObservationCallbackView(LoginRequiredMixin, View):
"""
This is the view that handles the user returning **from** the facility back to the TOM.
The query parameters must include facility, target_id and observation_id as these are
the required parameters for creating an ObservationRecord. Once an ObservationRecord object
is created, permissions are assigned to it for the current user, and the user is redirected
to the observation detail page.
"""
def get(self, request):
facility_name = request.GET.get('facility')
target_id = request.GET.get('target_id')
observation_id = request.GET.get('observation_id')
parameters = request.GET
user = request.user
if not target_id:
messages.error(self.request, 'Missing required parameter: target_id')
return redirect(reverse('tom_observations:list'))
elif not all([facility_name, observation_id]):
messages.error(self.request, 'Missing required parameters: facility, observation_id')
return redirect(reverse('targets:detail', kwargs={'pk': target_id}))
target = get_object_or_404(Target, id=target_id)
try:
facility_class = get_service_class(facility_name)
facility = facility_class()
except ImportError:
messages.error(self.request, "Invalid facility specified in callback URL")
return redirect(reverse('targets:detail', kwargs={'pk': target_id}))
try:
if hasattr(facility, 'request_id_to_group'):
# Check if the facility implements request_id_to_group
obs_group = facility.request_id_to_group(observation_id, user, target, parameters)
else:
# Create an observation group with a single record if not
observation = ObservationRecord.objects.create(
user=user,
facility=facility_name,
target=target,
observation_id=observation_id,
parameters=parameters
)
obs_group = ObservationGroup.objects.create(
name=f"{facility_name} obs #{observation_id}"
)
obs_group.observation_records.add(observation)
except Exception as e:
messages.error(self.request, f"Error createing observation records: {e}")
return redirect(reverse('targets:detail', kwargs={'pk': target_id}))
for observation in obs_group.observation_records.all():
assign_perm('tom_observations.view_observationrecord', user, observation)
assign_perm('tom_observations.change_observationrecord', user, observation)
assign_perm('tom_observations.delete_observationrecord', user, observation)
for group in request.user.groups.all():
assign_perm('tom_observations.view_observationrecord', group, observation)
assign_perm('tom_observations.change_observationrecord', group, observation)
assign_perm('tom_observations.delete_observationrecord', group, observation)
return redirect(reverse('tom_observations:list') + f'?observationgroup={obs_group.id}')
[docs]
class AddExistingObservationView(LoginRequiredMixin, FormView):
"""
View for associating a pre-existing observation with a target. Requires authentication.
The GET view returns a confirmation page for adding duplicate ObservationRecords. Two duplicates are any two
ObservationRecords with the same target_id, facility, and observation_id.
The POST view validates the form and redirects to the confirmation page if the confirm flag isn't set.
This view is intended to be navigated to via the existing_observation_button templatetag, as the
AddExistingObservationForm has a hidden confirmation checkbox selected by default.
"""
template_name = 'tom_observations/existing_observation_confirm.html'
form_class = AddExistingObservationForm
[docs]
def get_initial(self):
"""
Populates the ``ManualObservationForm`` hidden field for target id with the id from the specified target.
:returns: initial form data
:rtype: dict
"""
if self.request.method == 'GET':
params = self.request.GET.dict()
params['confirm'] = True
return params
[docs]
class ObservationRecordDetailView(DetailView):
"""
View for displaying the details of an ``ObservationRecord`` object.
"""
model = ObservationRecord
[docs]
def get_queryset(self, *args, **kwargs):
"""
Gets the set of ``ObservationRecord`` objects associated with targets that the current user is authorized to
view.
:returns: set of ObservationRecords
:rtype: QuerySet
"""
if settings.TARGET_PERMISSIONS_ONLY:
return ObservationRecord.objects.filter(
target__in=targets_for_user(self.request.user, Target.objects.all(), 'view_target')
)
else:
return get_objects_for_user(self.request.user, 'tom_observations.view_observationrecord')
[docs]
def get_context_data(self, *args, **kwargs):
"""
Adds a number of items to the context object for this view, including the form for adding resulting
``DataProduct`` objects to a ``DataProductGroup``, the ``DataProduct`` objects associated with the
``ObservationRecord``, and the most recent image from this ``ObservationRecord``. It also populates the
``DataProductUploadForm`` hidden fields with initial data.
:returns: context dictionary
:rtype: dict
"""
context = super().get_context_data(*args, **kwargs)
context['form'] = AddProductToGroupForm()
facility = get_service_class(self.object.facility)()
facility.set_user(self.request.user)
context['editable'] = isinstance(facility, BaseManualObservationFacility)
context['data_products'] = facility.all_data_products(self.object)
context['can_be_cancelled'] = self.object.status not in facility.get_terminal_observing_states()
newest_image = None
for data_product in context['data_products']['saved']:
newest_image = data_product if (not newest_image or data_product.modified > newest_image.modified) and \
is_fits_image_file(data_product.data.file) else newest_image
if newest_image:
context['image'] = newest_image.get_preview()
data_product_upload_form = DataProductUploadForm(
initial={
'observation_record': self.get_object(),
'referrer': reverse('tom_observations:detail', args=(self.get_object().id,))
}
)
context['data_product_form'] = data_product_upload_form
return context
[docs]
class ObservationGroupCreateView(LoginRequiredMixin, CreateView):
"""
View that handles the creation of ``ObservationGroup`` objects. Requires authentication.
"""
model = ObservationGroup
fields = ['name']
success_url = reverse_lazy('tom_observations:group-list')
[docs]
class ObservationGroupListView(PermissionListMixin, ListView):
"""
View that handles the display of ``ObservationGroup``.
Requires authorization.
"""
permission_required = 'tom_observations.view_observationgroup'
model = ObservationGroup
paginate_by = 25
[docs]
class ObservationGroupDeleteView(Raise403PermissionRequiredMixin, DeleteView):
"""
View that handles the deletion of ``ObservationGroup`` objects. Requires authorization.
"""
permission_required = 'tom_observations.delete_observationgroup'
model = ObservationGroup
success_url = reverse_lazy('tom_observations:group-list')
[docs]
class ObservationTemplateFilter(rest_framework.FilterSet):
"""
Defines the available fields for filtering the list of ``ObservationTemplate`` objects.
"""
facility = ChoiceFilter(
choices=[(k, k) for k in get_service_classes().keys()]
)
name = CharFilter(lookup_expr='icontains')
class Meta:
model = ObservationTemplate
fields = ['name', 'facility']
[docs]
class ObservationTemplateListView(FilterView):
"""
Displays the observing strategies that exist in the TOM.
"""
model = ObservationTemplate
filterset_class = ObservationTemplateFilter
template_name = 'tom_observations/observationtemplate_list.html'
[docs]
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
context['installed_facilities'] = get_service_classes()
return context
[docs]
class ObservationTemplateCreateView(FormView):
"""
Displays the form for creating a new observation template. Uses the observation template form specified in the
respective facility class.
"""
template_name = 'tom_observations/observationtemplate_form.html'
def get_facility_name(self):
return self.kwargs['facility']
[docs]
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
facility = get_service_class(self.get_facility_name())()
# Check configuration of facility and pass names of missing settings to context as 'missing_configurations'.
try:
context['missing_configurations'] = ", ".join(facility.facility_settings.get_unconfigured_settings())
except AttributeError:
context['missing_configurations'] = ''
return context
[docs]
def get_initial(self):
initial = super().get_initial()
initial['facility'] = self.get_facility_name()
initial.update(self.request.GET.dict())
return initial
[docs]
class ObservationTemplateUpdateView(LoginRequiredMixin, FormView):
"""
View for updating an existing observation template.
"""
template_name = 'tom_observations/observationtemplate_form.html'
def get_object(self):
return ObservationTemplate.objects.get(pk=self.kwargs['pk'])
[docs]
def get_initial(self):
initial = super().get_initial()
initial.update(self.object.parameters)
initial['facility'] = self.object.facility
return initial
[docs]
class ObservationTemplateDeleteView(LoginRequiredMixin, DeleteView):
"""
Deletes an observation template.
"""
model = ObservationTemplate
success_url = reverse_lazy('tom_observations:template-list')
[docs]
class FacilityStatusView(TemplateView):
template_name = 'tom_observations/facility_status.html'
[docs]
def render_facility_status_list(request, *args, **kwargs):
"""
View function for rendering the facility status partial.
"""
facility_statuses = []
for facility_class in get_service_classes().values():
facility = facility_class()
facility.set_user(request.user)
weather_urls = facility.get_facility_weather_urls()
status = facility.get_facility_status()
# add the weather_url to the site dictionary
for site in status.get('sites', []):
url = next((
site_url['weather_url'] for site_url in weather_urls.get('sites', [])
if site_url['code'] == site['code']), None)
if url is not None:
site['weather_url'] = url
facility_statuses.append(status)
hx_trigger = request.GET.get('hx_trigger')
if hx_trigger != 'load':
messages.info(request, "Facility statuses updated.")
return render(
request, 'tom_observations/partials/facility_status_table.html',
context={'facilities': facility_statuses, 'loading': False})