Customizing Data Processing¶
One of the many goals of the TOM Toolkit is to enable the simplification of the flow of your data from observations. To that end, there’s some built-in functionality that can be overridden to allow your TOM to work for your use case.
To begin, here’s a brief look at the part of the structure of the tom_dataproducts app in the TOM Toolkit:
tom_dataproducts
├──data_processor.py
├──data_serializers.py
├──hooks.py
└──models.py
Let’s start with a quick overview of models.py
. The file contains the Django models for the dataproducts app–in our
case, DataProduct
and ReducedDatum
. The DataProduct
contains information about uploaded or saved DataProducts
,
such as the file name, file path, and what kind of file it is. The ReducedDatum
contains individual science data
points that are taken from the DataProduct
files. Examples of ReducedDatum
points would be individual photometry
points or individual spectra.
When a user either uploads or saves a DataProduct
to their TOM, the TOM runs a hook, as described in the
Custom Code section of the documentation. The default version of this hook looks like this:
import json
from .data_processor import DataProcessor
from .data_serializers import SpectrumSerializer
from .models import ReducedDatum, SPECTROSCOPY, PHOTOMETRY
def data_product_post_upload(dp, observation_timestamp, facility):
processor = DataProcessor()
if dp.tag == SPECTROSCOPY[0]:
spectrum = processor.process_spectroscopy(dp, facility)
serialized_spectrum = SpectrumSerializer().serialize(spectrum)
ReducedDatum.objects.create(
target=dp.target,
data_product=dp,
data_type=dp.tag,
timestamp=observation_timestamp,
value=serialized_spectrum
)
elif dp.tag == PHOTOMETRY[0]:
photometry = processor.process_photometry(dp)
for time, photometry_datum in photometry.items():
ReducedDatum.objects.create(
target=dp.target,
data_product=dp,
data_type=dp.tag,
timestamp=time,
value=json.dumps(photometry_datum)
)
The basic idea is as follows: depending on the tag of the DataProduct
passed in, the data in the DataProduct
is
processed by the DataProcessor
class into a uniform format. The resulting object, if necessary, is then serialized
by the SpectrumSerializer
(the default photometry format is already easily serializable) so that it can be stored
in the database as a ReducedDatum
. Then, the ReducedDatum
objects are created and stored in the database.
The meat and potatoes of the processing is in the DataProcessor
class, and the details of that can be seen in the
source code. We understand
that the way the data is processed might not work for everyone, and so it’s easily customizable.
To do so, it’s as simple as creating a custom DataProcessor
class that inherits from the one in the TOMToolkit. Let’s
say most of the DataProcessor code is great, but you want to change how spectra are processed from FITS files:
from tom_dataproducts.data_processor import DataProcessor
class CustomDataProcessor(DataProcessor):
def _process_spectrum_from_fits(self, data_product, facility):
# Custom processing here, needs to return a Spectrum1D
return spectrum
Then, just add the path to your CustomDataProcessor
class file to your TOM settings.py:
...
DATA_PROCESSOR_CLASS = 'mytom.custom_data_processor.CustomDataProcessor'
...
As long as the CustomDataProcessor
returns an object with the same type as the superclass implementation, you won’t
need to change anything else. However, if you do have a need to return a different object type, then you can just
override the SpectrumSerializer
in the tom_dataproducts.data_serializers.py
. Be careful, because the TOM Toolkit
doesn’t have a mechanism to provide a custom serializer, so you’ll also need to customize your
data_product_post_upload
hook. Here’s a brief example of a custom serializer:
from tom_dataproducts.data_serializers import SpectrumSerializer
class CustomSpectrumSerializer(SpectrumSerializer):
def serialize(self, spectrum):
# convert spectrum into dict
return json.dumps(spectrum_dict)
def deserialize(self, spectrum):
data = json.loads(spectrum) # spectrum is a dict object
# convert from dict to preferred object type
return converted_spectrum
Then, in your custom hook:
import json
from .models import ReducedDatum, SPECTROSCOPY, PHOTOMETRY
from mytom.custom_data_serializers import CustomSpectrumSerializer
from mytom.custom_data_processor import CustomDataProcessor
def custom_data_product_post_upload(dp, observation_timestamp, facility):
processor = CustomDataProcessor()
if dp.tag == SPECTROSCOPY[0]:
spectrum = processor.process_spectroscopy(dp, facility)
serialized_spectrum = CustomSpectrumSerializer().serialize(spectrum)
ReducedDatum.objects.create(
target=dp.target,
data_product=dp,
data_type=dp.tag,
timestamp=observation_timestamp,
value=serialized_spectrum
)
elif dp.tag == PHOTOMETRY[0]:
photometry = processor.process_photometry(dp)
for time, photometry_datum in photometry.items():
ReducedDatum.objects.create(
target=dp.target,
data_product=dp,
data_type=dp.tag,
timestamp=time,
value=json.dumps(photometry_datum)
)
And just like that, your TOM will be running your custom processing code.