Source code for SPCA.Photometry_Common

import os, glob
import numpy as np
from astropy.stats import sigma_clip

[docs]def get_fnames(directory, AOR_snip, ch): """Find paths to all the fits files. Args: directory (string): Path to the directory containing all the Spitzer data. AOR_snip (string): Common first characters of data directory eg. 'r579'. ch (string): Channel used for the observation eg. 'ch1' for channel 1. Returns: tuple: fname, lens (list, list). List of paths to all bcd.fits files, number of files for each AOR (needed for adding correction stacks). """ lst = os.listdir(directory) AOR_list = [k for k in lst if AOR_snip in k] fnames = [] lens = [] for i in range(len(AOR_list)): path = directory + '/' + AOR_list[i] + '/' + ch +'/bcd' files = glob.glob(os.path.join(path, '*bcd.fits')) fnames.extend(files) lens.append(len(files)) #fnames.sort() return fnames, lens
[docs]def get_stacks(calDir, dataDir, AOR_snip, ch): """Find paths to all the background subtraction correction stacks FITS files. Args: calDir (string): Path to the directory containing the correction stacks. dataDir (string): Path to the directory containing the Spitzer data to be corrected. AOR_snip (string): Common first characters of data directory eg. 'r579'. ch (string): Channel used for the observation eg. 'ch1' for channel 1. Returns: list: List of paths to the relevant correction stacks """ stacks = np.array(os.listdir(calDir)) locs = np.array([stacks[i].find('SPITZER_I') for i in range(len(stacks))]) good = np.where(locs!=-1)[0] #filter out all files that don't fit the correct naming convention for correction stacks offset = 11 #legth of the string "SPITZER_I#_" keys = np.array([stacks[i][locs[i]+offset:].split('_')[0] for i in good]) #pull out just the key that says what sdark this stack is for data_list = os.listdir(dataDir) AOR_list = [a for a in data_list if AOR_snip in a] calFiles = [] for i in range(len(AOR_list)): path = dataDir + '/' + AOR_list[i] + '/' + ch +'/cal/' if not os.path.isdir(path): print('Error: Folder \''+path+'\' does not exist, so automatic correction stack selection cannot be performed') return [] fname = glob.glob(path+'*sdark.fits')[0] loc = fname.find('SPITZER_I')+offset key = fname[loc:].split('_')[0] calFiles.append(os.path.join(calDir, stacks[list(good)][np.where(keys == key)[0][0]])) return calFiles
[docs]def get_time(hdu_list, time, ignoreFrames): """Gets the time stamp for each image. Args: hdu_list (list): content of fits file. time (ndarray): Array of existing time stamps. ignoreFrames (ndarray): Array of frames to ignore (consistently bad frames). Returns: ndarray: Updated time stamp array. """ h, w, l = hdu_list[0].data.shape sec2day = 1.0/(3600.0*24.0) step = hdu_list[0].header['FRAMTIME']*sec2day t = np.linspace(hdu_list[0].header['BMJD_OBS'] + step/2, hdu_list[0].header['BMJD_OBS'] + (h-1)*step, h) if ignoreFrames != []: t[ignoreFrames] = np.nan time.extend(t) return time
[docs]def oversampling(image_data, a = 2): """First, substitutes all invalid/sigma-clipped pixels by interpolating the value, then linearly oversamples the image. Args: image_data (ndarray): Data cube of images (2D arrays of pixel values). a (int, optional): Sampling factor, e.g. if a = 2, there will be twice as much data points in the x and y axis. Default is 2. (Do not recommend larger than 2) Returns: ndarray: Data cube of oversampled images (2D arrays of pixel values). """ l, h, w = image_data.shape gridy, gridx = np.mgrid[0:h:1/a, 0:w:1/a] image_over = np.empty((l, h*a, w*a)) for i in range(l): image_masked = np.ma.masked_invalid(image_data[i,:,:]) points = np.where(image_masked.mask == False) image_compre = np.ma.compressed(image_masked) image_over[i,:,:] = interpolate.griddata(points, image_compre, (gridx, gridy), method = 'linear') return image_over/(a**2)
[docs]def sigma_clipping(image_data, filenb = 0 , fname = ['not provided'], tossed = 0, badframetable = None, bounds = (13, 18, 13, 18), sigma=4, maxiters=2): """Sigma clips bad pixels and mask entire frame if the sigma clipped pixel is too close to the target. Args: image_data (ndarray): Data cube of images (2D arrays of pixel values). filenb (int, optional): Index of current file in the 'fname' list (list of names of files) to keep track of the files that were tossed out. Default is 0. fname (list, optional): List of names of files to keep track of the files that were tossed out. tossed (int, optional): Total number of image tossed out. Default is 0 if none provided. badframetable (list, optional): List of file names and frame number of images tossed out from 'fname'. bounds (tuple, optional): Bounds of box around the target. Default is (13, 18, 13, 18). Returns: tuple: sigma_clipped_data (3D array) - Data cube of sigma clipped images (2D arrays of pixel values). tossed (int) - Updated total number of image tossed out. badframetable (list) - Updated list of file names and frame number of images tossed out from 'fname'. """ if badframetable is None: badframetable = [] lbx, ubx, lby, uby = bounds h, w, l = image_data.shape # mask invalids image_data2 = np.ma.masked_invalid(image_data) # make mask to mask entire bad frame x = np.ones((w, l)) mask = np.ma.make_mask(x) try: sig_clipped_data = sigma_clip(image_data2, sigma=sigma, maxiters=maxiters, cenfunc=np.ma.median, axis = 0) except TypeError: sig_clipped_data = sigma_clip(image_data2, sigma=sigma, iters=maxiters, cenfunc=np.ma.median, axis = 0) for i in range (h): if np.ma.is_masked(sig_clipped_data[i, lbx:ubx, lby:uby]): sig_clipped_data[i,:,:] = np.ma.masked_array(sig_clipped_data[i,:,:], mask = mask) badframetable.append([i,filenb,fname]) tossed += 1 return sig_clipped_data, tossed, badframetable
[docs]def bgsubtract(img_data, bg_flux=None, bg_err=None, bounds=(11, 19, 11, 19)): """Measure the background level and subtracts the background from each frame. Args: img_data (ndarray): Data cube of images (2D arrays of pixel values). bg_flux (ndarray, optional): Array of background measurements for previous images. Default is None. bg_err (ndarray, optional): Array of uncertainties on background measurements for previous images. Default is None. bounds (tuple, optional): Bounds of box around the target. Default is (11, 19, 11, 19). Returns: tuple: bgsub_data (3D array) Data cube of background subtracted images. bg_flux (1D array) Updated array of background flux measurements for previous images. bg_err (1D array) Updated array of uncertainties on background measurements for previous images. """ if bg_flux is None: bg_flux = [] if bg_err is None: bg_err = [] lbx, ubx, lby, uby = bounds image_data = np.ma.copy(img_data) h, w, l = image_data.shape x = np.zeros(image_data.shape) x[:, lbx:ubx,lby:uby] = 1 mask = np.ma.make_mask(x) masked = np.ma.masked_array(image_data, mask = mask) masked = np.reshape(masked, (h, w*l)) bg_med = np.reshape(np.ma.median(masked, axis=1), (h, 1, 1)) bgsub_data = image_data - bg_med bgsub_data = np.ma.masked_invalid(bgsub_data) bg_flux.extend(bg_med.ravel()) bg_err.extend(np.ma.std(masked, axis=1)) return bgsub_data, bg_flux, bg_err
[docs]def binning_data(data, size): """Median bin an array. Args: data (1D array): Array of data to be binned. size (int): Size of bins. Returns: tuple: binned_data (1D array) Array of binned data. binned_data_std (1D array) Array of standard deviation for each entry in binned_data. """ data = np.ma.masked_invalid(data) reshaped_data = data.reshape(int(len(data)/size), size) binned_data = np.ma.median(reshaped_data, axis=1) binned_data_std = np.std(reshaped_data, axis=1) return binned_data, binned_data_std