Source code for util

"""
Useful functions and definitions
"""
import astropy.coordinates.angles as angles
#from astropy.table import Table
from astropy.time import Time
from astropy import units as u
import re
import getopt, sys, os
try:
	import openpyxl  # see http://openpyxl.readthedocs.org/en/latest/index.html
	noexcelimport = False
except:
	noexcelimport = True
from bisect import bisect_left
import pickle as pickle
import ephem
from configparser import SafeConfigParser
import importlib
import sys
import gzip
#import csv
import numpy as np

# DO NOT IMPORT obs as it import util already, this create a loop and break function import.
#import obs

import logging
logger = logging.getLogger(__name__)



[docs]def takeclosest(dico, key, value): #todo: rename dico in dict """ .. warning:: I assume that dict[key] is sorted. Returns the dict value which dict[key] is closest to value. If two dict[key] are equally close to value, return the highest (i.e. latest). :param dico: python dictionary you want to sort :param key: dictionary key used for sorting :param value: target value, :return: index of the element in dico that is the closest to the target value .. note:: This is much faster than a simple min loop, although a bit more tedious to use. """ mylist = [elt[key] for elt in dico] pos = bisect_left(mylist, value) if pos == 0: return dico[0] if pos == len(mylist): return dico[-1] before = dico[pos - 1] after = dico[pos] if after[key] - value <= value - before[key]: return after else: return before
[docs]def hilite(string, status, bold): """ Helper to add colors and bold in the terminal :param string: string you want to color or bold :param status: boolean, if True then the text is colored in green, otherwise in red. :param bold: boolean, if True then the text is bolded :return: """ if not sys.stdout.isatty() : return '*'+string+'*' attr = [] if status: # green attr.append('32') else: # red attr.append('31') if bold: attr.append('1') return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
[docs]def writepickle(obj, filepath, protocol=-1): """ I write your python object obj into a pickle file at filepath. If filepath ends with .gz, I'll use gzip to compress the pickle. :param obj: python container you want to compress :param filepath: string, path where the pickle will be written :param protocol: Leave protocol = -1 : I'll use the latest binary protocol of pickle. """ if os.path.splitext(filepath)[1] == ".gz": pkl_file = gzip.open(filepath, 'wb') else: pkl_file = open(filepath, 'wb') pickle.dump(obj, pkl_file, protocol) pkl_file.close() logger.debug("Wrote %s" % filepath)
[docs]def readpickle(filepath): """ I read a pickle file and return whatever object it contains. If the filepath ends with .gz, I'll unzip the pickle file. :param filepath: string, path of the pickle to load :return: object contained in the pickle """ if os.path.splitext(filepath)[1] == ".gz": pkl_file = gzip.open(filepath,'rb') else: pkl_file = open(filepath, 'rb') obj = pickle.load(pkl_file) pkl_file.close() logger.debug("Read %s" % filepath) return obj
[docs]def readconfig(configpath): """ Reads in a config file :param configpath: path of the configfile :return: configuration dictionary """ #todo: change SafeConfigParser to ConfigParser. Make sure logic is respected config = SafeConfigParser(allow_no_value=True) if not os.path.exists(configpath): raise RuntimeError("Config file '{}' does not exist!".format(configpath)) logger.info("Reading config from '{}'...".format(configpath)) config.read(configpath) return config
[docs]def grid_points(res_x=400,res_y=200): """ Maps the whole sky in right ascension and declination :param res_x: integer, number of points in right ascension :param res_y: integer, number of points in declination .. note:: the points are equally spaced. :return: numpy tuples containing right ascension points and declination points """ ra_i = 0. ra_f = 2*np.pi ra_step=(ra_f-ra_i)/res_x dec_i = -np.pi/2. dec_f = np.pi/2. dec_step=(dec_f-dec_i)/res_y ras = np.arange(ra_i+ra_step/2, ra_f, ra_step) decs= np.arange(dec_i+dec_step/2, dec_f, dec_step) return ras,decs
[docs]def elev2airmass(el, alt, threshold=10.): """ Converts the elevation to airmass. :param el: float, elevation in radians :param alt: float, altitude of the observer in meters :param threshold: maximum allowed airmass, will be returned if actual airmass exceeds the threshold :return: airmass .. note:: This is the code used for the Euler EDP at La Silla.""" altitudeFactor = 0.00087 + alt*(-8.6664803e-8) # altitude factor cosz = np.cos(np.pi/2.-el) if(cosz< 0.1): # we do not compute Airmass for small value of cosz airmass = threshold else: airmass = (1.0 + altitudeFactor - altitudeFactor / (cosz * cosz)) / cosz return airmass
[docs]def check_value(var, flag): """ Check that a value is NaN, replace it with a given flag if True :param var: value to check against NaN :param flag: replacement value :return: processed value """ if np.isnan(var): var = flag return var
[docs]def load_station(name): """ Load the parameters corresponding to an observation station :param name: string, name of the station. The corresponding file must be located in :meth:`config` :return: station parameters """ module_name = "config.{}".format(name) station = importlib.import_module(module_name, package=None) return station
[docs]def time2hhmm(obstime): """ Concatenate a string HH MM SS or HH.MM.SS into an HHMM string :param obstime: string, HH MM SS or HH.MM.SS :return: HHMM string """ return (str(obstime).split(" ")[1]).split(".")[0][:-3]
""" def excelimport(filename, obsprogram=None): ''' Wrapper around openpyxl to transform excel sheets into POUET. .. info:: This feature is still experimental and completely spreadsheet dependent. Requirex openpyxl installed, otherwise exits. Import an excel catalog into a list of observables :param filename: string, path to the excel file :param obsprogram: string, observing program associated to the catalog you are importing. :return: list of observables .. warning:: I directly read the excel values, I do NOT evaluate the formulas in them. It is up to the user to put the right mjd in the excel sheets. ''' if noexcelimport: raise NotImplemented("Excel files cannot be imported at the moment - you need to install openpyxl") else: observables = [] #### For BEBOP if obsprogram == 'bebop': ''' special properties: phases : a list of dictionnaries : [{mjd, phase, hourafterstart }] comment : a string of comments (exptime, requested phase,...) internalobs : a boolean (0 or 1), allowing or not observability ''' try: wb = openpyxl.load_workbook(filename, data_only=True) # Read the excel spreadsheet, loading the values directly # ws = wb.active # choose active sheet ws = wb['Sheet1'] except: raise RuntimeError("Either %s does not exists, or it is not in .xlsx format !!" % filename) # Get tabler limits rows = ws.rows columns = ws.columns breakcolind = None breakrowind = None for ind, cell in enumerate(rows[1]): if cell.value == None: # breakcolind = cell.column breakcolind = rows[1][ind - 1].column break else: pass for ind, cell in enumerate(columns[0]): if cell.value == None: # breakrowind = cell.row breakrowind = columns[0][ind - 1].row break else: pass # Read only the non "None" data and put it in a table of dict, because fuck excel and fuck openpyxl. data = ws['A1':'%s%s' % (breakcolind, breakrowind)] ''' Structure of the spreadsheet: Infos are from A1 to W2 Datas are from A3 ro W30 B1 : actual modified julian date A : name B : target C : comment I : observability J : requested phase M1 to W1 : mjd over the night M2 to W2 : corresponding time after night start, in hours M to W : phases ''' phasesnames = ['M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W'] values = {} for indr, row in enumerate(data): for indc, cell in enumerate(row): try: values[cell.address] = cell.value # Apparently, this is an old version except: values[cell.coordinate] = cell.value for i in np.arange(3, 31): # create an observable object with the common properties name = values['A%s' % str(i)] coordinates = values['B%s' % str(i)] alpha = coordinates[0:2] + ':' + coordinates[2:4] + ':' + coordinates[4:6] delta = coordinates[7:9] + ':' + coordinates[9:11] + ':' + coordinates[11:13] if coordinates[6] == "S": delta = '-' + delta # add properties specific to this program ## Tricky stuff here : the jdb in the excel sheet is the mjd + 0.5. phases = [{'mjd': values['%c%i' % (col, 1)] - 0.5, 'hourafterstart': values['%c%i' % (col, 2)], 'phase': values['%c%i' % (col, i)]} for col in phasesnames] attributes = {'phases': phases} # observable.phases = phases if values['I%s' % str(i)] == 'yes': attributes['internalobs'] = 1 else: attributes['internalobs'] = 0 observable = obs.Observable(name=name, obsprogram=obsprogram, alpha=alpha, delta=delta, attributes=attributes) comment = '' if values['C%s' % str(i)] is not None: comment = comment + values['C%s' % str(i)] if values['J%s' % str(i)] != '/': comment = comment + '\n' + 'Requested phase: ' + values['J%s' % str(i)] if comment != '': observable.comment = comment observables.append(observable) # TODO: check that the modified julian date corresponds to the ongoing night # TODO: assert that the above structure is correct ! (use keywords in the Info fields...?) if obsprogram == "transit": pass if obsprogram == "superwasp": # http://openpyxl.readthedocs.org/en/latest/optimized.html --- that will be useful for Amaury's monstruous spreadsheet ''' special properties: phases : a list of dictionnaries : [{mjd, phase, hourafterstart }] comment : a string of comments (exptime, requested phase,...) internalobs : a boolean (0 or 1), allowing or not observability ''' logger.info('reading %s...' % filename) try: wb = openpyxl.load_workbook(filename, data_only=True) # Read the excel spreadsheet, loading the values directly ws = wb['Observations'] # choose active sheet except: raise RuntimeError("Either %s does not exists, or it is not in .xlsx format !!" % filename) logger.info('get tabler limits...') # Get tabler limits rows = ws.rows columns = ws.columns breakcolind = None breakrowind = None for ind, cell in enumerate(rows[1]): if cell.value == None: # breakcolind = cell.column breakcolind = rows[1][ind - 1].column break else: pass for ind, cell in enumerate(columns[0]): if cell.value == None: # breakrowind = cell.row breakrowind = columns[0][ind - 1].row break else: pass logger.info(breakrowind, breakcolind) sys.exit() # Read only the non "None" data and put it in a table of dict, because fuck excel and fuck openpyxl. data = ws['A1':'%s%s' % (breakcolind, breakrowind)] ''' Structure of the spreadsheet: Infos are from A1 to W2 Datas are from A3 ro W30 B1 : actual modified julian date A : name B : target C : comment I : observability J : requested phase M1 to W1 : mjd over the night M2 to W2 : corresponding time after night start, in hours M to W : phases ''' if obsprogram == "followup": pass return observables """