Source code for util

"""
Useful functions and definitions
"""
import astropy.coordinates.angles as angles
#from astropy.table import Table
from astropy.time import Time
from astropy import units as u
import re
import getopt, sys, os
try:
	import openpyxl  # see http://openpyxl.readthedocs.org/en/latest/index.html
	noexcelimport = False
except:
	noexcelimport = True
from bisect import bisect_left
import pickle as pickle
import ephem
from configparser import SafeConfigParser
import importlib

#import csv
import numpy as np
import obs
import logging
logger = logging.getLogger(__name__)



[docs]def takeclosest(dico, key, value): """ Assumes dict[key] is sorted. Returns the dict value which dict[key] is closest to value. If two dict[key] are equally close to value, return the highest (i.e. latest). This is much faster than a simple min loop, although a bit more tedious to use. """ mylist = [elt[key] for elt in dico] pos = bisect_left(mylist, value) if pos == 0: return dico[0] if pos == len(mylist): return dico[-1] before = dico[pos - 1] after = dico[pos] if after[key] - value <= value - before[key]: return after else: return before
[docs]def hilite(string, status, bold): '''Graphism: colors and bold in the terminal''' if not sys.stdout.isatty() : return '*'+string+'*' attr = [] if status: # green attr.append('32') else: # red attr.append('31') if bold: attr.append('1') return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
[docs]def excelimport(filename, obsprogram=None): if noexcelimport: raise NotImplemented("Excel files cannot be imported at the moment") else: """ Import an excel catalog(...) into a list of observables I directly read the excel values, I do NOT evaluate the formulas in them. It is up to you to put the right mjd in the excel sheets. Warning : NEVER use the coordinates from an excel sheet to create an rdb night planning. ALWAYS use the rdb catalogs loaded in the edp. And double check the distance to moon ! """ observables = [] #### For BEBOP if obsprogram == 'bebop': """ special properties: phases : a list of dictionnaries : [{mjd, phase, hourafterstart }] comment : a string of comments (exptime, requested phase,...) internalobs : a boolean (0 or 1), allowing or not observability """ try: wb = openpyxl.load_workbook(filename, data_only=True) # Read the excel spreadsheet, loading the values directly #ws = wb.active # choose active sheet ws = wb['Sheet1'] except: raise RuntimeError("Either %s does not exists, or it is not in .xlsx format !!" % filename) # Get tabler limits rows = ws.rows columns = ws.columns breakcolind = None breakrowind = None for ind, cell in enumerate(rows[1]): if cell.value == None: #breakcolind = cell.column breakcolind = rows[1][ind-1].column break else: pass for ind, cell in enumerate(columns[0]): if cell.value == None: #breakrowind = cell.row breakrowind = columns[0][ind-1].row break else: pass # Read only the non "None" data and put it in a table of dict, because fuck excel and fuck openpyxl. data = ws['A1':'%s%s' % (breakcolind, breakrowind)] """ Structure of the spreadsheet: Infos are from A1 to W2 Datas are from A3 ro W30 B1 : actual modified julian date A : name B : target C : comment I : observability J : requested phase M1 to W1 : mjd over the night M2 to W2 : corresponding time after night start, in hours M to W : phases """ phasesnames = ['M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W'] values = {} for indr, row in enumerate(data): for indc, cell in enumerate(row): try: values[cell.address] = cell.value # Apparently, this is an old version except: values[cell.coordinate] = cell.value for i in np.arange(3, 31): # create an observable object with the common properties name = values['A%s' % str(i)] coordinates = values['B%s' % str(i)] alpha = coordinates[0:2]+':'+coordinates[2:4]+':'+coordinates[4:6] delta = coordinates[7:9]+':'+coordinates[9:11]+':'+coordinates[11:13] if coordinates[6] == "S": delta = '-'+delta # add properties specific to this program ## Tricky stuff here : the jdb in the excel sheet is the mjd + 0.5. phases = [{'mjd': values['%c%i' % (col, 1)]-0.5, 'hourafterstart': values['%c%i' % (col, 2)], 'phase': values['%c%i' % (col, i)]} for col in phasesnames] attributes = {'phases': phases} #observable.phases = phases if values['I%s' % str(i)] == 'yes': attributes['internalobs'] = 1 else: attributes['internalobs'] = 0 observable = obs.Observable(name=name, obsprogram=obsprogram, alpha=alpha, delta=delta, attributes=attributes) comment = '' if values['C%s' % str(i)] is not None: comment = comment + values['C%s' % str(i)] if values['J%s' % str(i)] != '/': comment = comment + '\n' + 'Requested phase: ' + values['J%s' % str(i)] if comment != '': observable.comment = comment observables.append(observable) #TODO: check that the modified julian date corresponds to the ongoing night #TODO: assert that the above structure is correct ! (use keywords in the Info fields...?) if obsprogram == "transit": pass if obsprogram == "superwasp": # http://openpyxl.readthedocs.org/en/latest/optimized.html --- that will be useful for Amaury's monstruous spreadsheet """ special properties: phases : a list of dictionnaries : [{mjd, phase, hourafterstart }] comment : a string of comments (exptime, requested phase,...) internalobs : a boolean (0 or 1), allowing or not observability """ logger.info('reading %s...' % filename) try: wb = openpyxl.load_workbook(filename, data_only=True) # Read the excel spreadsheet, loading the values directly ws = wb['Observations'] # choose active sheet except: raise RuntimeError("Either %s does not exists, or it is not in .xlsx format !!" % filename) logger.info('get tabler limits...') # Get tabler limits rows = ws.rows columns = ws.columns breakcolind = None breakrowind = None for ind, cell in enumerate(rows[1]): if cell.value == None: #breakcolind = cell.column breakcolind = rows[1][ind-1].column break else: pass for ind, cell in enumerate(columns[0]): if cell.value == None: #breakrowind = cell.row breakrowind = columns[0][ind-1].row break else: pass logger.info(breakrowind, breakcolind) sys.exit() # Read only the non "None" data and put it in a table of dict, because fuck excel and fuck openpyxl. data = ws['A1':'%s%s' % (breakcolind, breakrowind)] """ Structure of the spreadsheet: Infos are from A1 to W2 Datas are from A3 ro W30 B1 : actual modified julian date A : name B : target C : comment I : observability J : requested phase M1 to W1 : mjd over the night M2 to W2 : corresponding time after night start, in hours M to W : phases """ if obsprogram == "followup": pass return observables
[docs]def writepickle(obj, filepath, verbose=True, protocol = -1): """ I write your python object obj into a pickle file at filepath. If filepath ends with .gz, I'll use gzip to compress the pickle. Leave protocol = -1 : I'll use the latest binary protocol of pickle. """ if os.path.splitext(filepath)[1] == ".gz": pkl_file = gzip.open(filepath, 'wb') else: pkl_file = open(filepath, 'wb') pickle.dump(obj, pkl_file, protocol) pkl_file.close() logger.debug("Wrote %s" % filepath)
[docs]def readpickle(filepath, verbose=True): """ I read a pickle file and return whatever object it contains. If the filepath ends with .gz, I'll unzip the pickle file. """ if os.path.splitext(filepath)[1] == ".gz": pkl_file = gzip.open(filepath,'rb') else: pkl_file = open(filepath, 'rb') obj = pickle.load(pkl_file) pkl_file.close() logger.debug("Read %s" % filepath) return obj
[docs]def readconfig(configpath): """ Reads in a config file """ config = SafeConfigParser(allow_no_value=True) if not os.path.exists(configpath): raise RuntimeError("Config file '{}' does not exist!".format(configpath)) logger.info("Reading config from '{}'...".format(configpath)) config.read(configpath) return config
[docs]def grid_points(res_x=400,res_y=200): """ Generates grid points on the sky """ ra_i = 0. ra_f = 2*np.pi ra_step=(ra_f-ra_i)/res_x dec_i = -np.pi/2. dec_f = np.pi/2. dec_step=(dec_f-dec_i)/res_y ras = np.arange(ra_i+ra_step/2, ra_f, ra_step) decs= np.arange(dec_i+dec_step/2, dec_f, dec_step) return ras,decs
[docs]def elev2airmass(el, alt, threshold=10.): ''' Converts the elevation to airmass. :param elevation_deg: elevation [radians] :param alt: altitude of station [m] :return: air mass This is the code used for the Euler EDP at La Silla.''' altitudeFactor = 0.00087 + alt*(-8.6664803e-8) # altitude factor cosz = np.cos(np.pi/2.-el) if(cosz< 0.1): # we do not compute Airmass for small value of cosz airmass = threshold; else: airmass = (1.0 + altitudeFactor - altitudeFactor / (cosz * cosz)) / cosz; return airmass
[docs]def check_value(var, flag): if np.isnan(var): var = flag return var
[docs]def load_station(name): module_name = "config.{}".format(name) station = importlib.import_module(module_name, package=None) return station
[docs]def time2hhmm(obstime): return (str(obstime).split(" ")[1]).split(".")[0][:-3]