Source code for recipe_system.adcc.servers.http_proxy

#
#                                                                        DRAGONS
#
#                                                                  http_proxy.py
# ------------------------------------------------------------------------------
import os
import json
import time
import select
import datetime

import urllib.error
import urllib.parse
import urllib.request

from socketserver import ThreadingMixIn
from http.server import BaseHTTPRequestHandler, HTTPServer

from recipe_system.cal_service import calurl_dict
# ------------------------------------------------------------------------------
# HTTP messaging, Global bits for logging
RECMSG = "Received {} events."
REQMSG = "Requesting current OP day events "
FAILMSG = "Failed to access Fitsstore. No metrics available."
ILLREQ = "Illegal request: No 'timestamp' parameter."
msg_form = '"%s" %s %s'
info_code = 203
fail_code = 416
no_access_code = 503
size = "-"

# ------------------------------------------------------------------------------
[docs] def parsepath(path): """ parsepath w/ urlparse. parameters: <string> return: <dict> """ rparms = {} parsed_url = urllib.parse.urlparse(path) rparms.update({"path": parsed_url.path}) rparms.update({"query": parsed_url.query}) rparms.update(urllib.parse.parse_qs(parsed_url.query)) return rparms
# ------------------------------------------------------------------------------ # Timing functions
[docs] def server_time(): """ Return a dictionary of server timing quantities related to current time. This dict will be returned to a call on the server, /rqsite.json (See do_GET() method of ADCCHandler class. parameters: <void> return: <dict>, dictionary of time now values. """ lt_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") unxtime = time.time() utc_now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f") utc_offset = datetime.datetime.utcnow() - datetime.datetime.now() if utc_offset.days != 0: utc_offset = -utc_offset utc_offset = -int(round(utc_offset.seconds / 3600.)) else: utc_offset = int(round(utc_offset.seconds / 3600.)) timezone = time.timezone // 3600 if timezone == 10: local_site = 'gemini-north' elif timezone in [3, 4]: # TZ -4 but +1hr DST applied inconsistently local_site = 'gemini-south' else: local_site = 'remote' time_dict = {"local_site": local_site, "tzname" : time.tzname[0], "lt_now" : lt_now, "unxtime" : unxtime, "utc_now" : utc_now, "utc_offset": utc_offset} return time_dict
[docs] def stamp_to_ymd(timestamp): """ Caller sends a timestamp in seconds of epoch. Return string for year month day of that time as YYYYMMDD' as used by url requests, as in http://<fitsstore_server>/qaforgui/20130616 parameters: <float>, seconds of epochs. return: <string>, YYYYMMDD of passed time. """ return time.strftime("%Y%m%d", time.localtime(timestamp))
[docs] def stamp_to_opday(timestamp): """ Converts a passed time stamp (sec) into the corresponding operational day. I.e. timestamps >= 14.00h are the next operational day. parameters: <float>, time in epoch seconds return: <string>, YYYYMMDD """ dt_object = datetime.datetime.fromtimestamp(timestamp) if dt_object.hour >= 14: timestamp = timestamp + 86400 return stamp_to_ymd(timestamp)
[docs] def ymd_to_stamp(yy, mm, dd, hh=0): """ Caller passes integers for year, month, and day. Return is the epoch time (sec). Year is 4 digit, eg., 2013 parameters: <int>, <int>, <int> [, <int>] Year, Month, Day [,Hour] return: <float>, epoch time in seconds. """ ymd = "{} {} {} {}".format(yy, mm, dd, hh) return time.mktime(time.strptime(ymd, "%Y %m %d %H"))
[docs] def current_op_timestamp(): """ Return the epoch time (sec) of the start of current operational day, where turnover occurs @ 14.00h localtime. I.e. if the hour >= 14.00, then the current operational day is tomorrow. Eg., 2013-08-02 17.00h is 20130803 parameters: <void> return: <float> """ hh = 14 tnow = datetime.datetime.now() t_epoch = time.time() if tnow.hour >= 14.0: op_day = stamp_to_ymd(t_epoch) else: op_day = stamp_to_ymd(t_epoch - 86400) yy, mm, dd = op_day[:4], op_day[4:6], op_day[6:] timestamp = ymd_to_stamp(yy, mm, dd, hh) return timestamp
# END Timing functions # ------------------------------------------------------------------------------ # FITS Store query.
[docs] def fstore_get(timestamp): """ Open a url on fitsstore/qaforgui/ with the passed timestamp. timestamp is in epoch seconds, which is converted here to a YMD string for the URL. Return a list of dicts of qa metrics data. Exceptions on urlopen() ----------------------- Any number of exceptions may be thrown on URL access: URLError, HTTPError, TimeoutError, ... . We don't really care which specific failure occurred, only that QA metrics are not acessible. Here, we catch all failures and simply pass, returning a empty list. N.B. -- A timestamp that evaluates to False will request everything from fitsstore. This could be huge. Be careful passing no timestamp! Parameters ---------- timestamp : <float>, time in epoch seconds Return ------ qa_data : <list>, list of dicts (json) of qametrics """ qa_data = list() # Get the fitsstore query url from calurl_dict fitsstore_qa = calurl_dict.calurl_dict['QAQUERYURL'] date_query = stamp_to_opday(timestamp) furl = os.path.join(fitsstore_qa, date_query) try: store_handle = urllib.request.urlopen(furl) qa_data = json.loads(store_handle.read()) except Exception: print(msg_form %(FAILMSG, no_access_code, size)) return qa_data
[docs] def specview_get(jfile): # hack to just open the test data file, data.json with open(jfile) as json_file: jdata = json.load(json_file) return jdata
# ------------------------------------------------------------------------------
[docs] class ADCCHandler(BaseHTTPRequestHandler): """ ADCC services request handler. """ events = None informers = None spec_events = None
[docs] def address_string(self): host, port = self.client_address[:2] return host
[docs] def log_request(self, code='-', size='-'): """Log an accepted request. This is called by send_response(). This overrides BaseHTTPRequestHandler.log_request. See that class for what the method does normally. """ msg_form = '"%s" %s %s' try: assert self.informers["verbose"] self.log_message(msg_form, repr(self.requestline), code, size) except AssertionError: if "cmdqueue.json" in self.requestline: pass else: self.log_message(msg_form, repr(self.requestline), code, size) return
[docs] def do_GET(self): """ Defined services on HTTP GET requests. """ events = self.informers["events"] spec_events = self.informers["spec_events"] self.informers["verbose"] = True dark_theme = self.informers['dark'] parms = parsepath(self.path) try: # First test for an html request on the QAP nighttime_metrics page. # I.e. <localhost>:<port>/qap/nighttime_metrics.html if self.path.startswith("/qap/"): dirname = os.path.dirname(__file__) if dark_theme: joinlist = [dirname, "../client/adcc_faceplate_dark/"] else: joinlist = [dirname, "../client/adcc_faceplate/"] # Split out any parameters in the URL self.path = self.path.split("?")[0] #append any further directory info. joinlist.append(self.path.split('qap/')[-1]) fname = os.path.join(*joinlist) self.log_message('{} {} {}'.format("Loading "+joinlist[1]+ os.path.basename(fname), 203, '-')) try: with open(fname, 'rb') as f: data = f.read() except OSError: data = bytes(b"<b>NO SUCH RESOURCE AVAILABLE</b>") self.send_response(200) if self.path.endswith(".js"): self.send_header('Content-type', 'text/javascript') elif self.path.endswith(".css"): self.send_header("Content-type", "text/css") elif fname.endswith(".png"): self.send_header('Content-type', "image/png") else: self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(data) return # ------------------------------------------------------------------ # The vast majority of HTTP client GET requests will be on the # cmdqueue service. Handle first. if parms["path"].startswith("/cmdqueue.json"): self._handle_cmdqueue_json(events, parms) # ------------------------------------------------------------------ # HTTP client GET requests on QAP Spectra (/qapspec/) Service. if parms["path"].startswith("/qapspec/"): dirname = os.path.dirname(__file__) joinlist = [dirname, "../client/qap_specviewer/"] # Split out any parameters in the URL self.path = self.path.split("?")[0] # Append any further directory info. joinlist.append(self.path.split('qapspec/')[-1]) fname = os.path.join(*joinlist) self.log_message('{} {} {}'.format("Loading "+joinlist[1]+ os.path.basename(fname), 203, '-')) #try: with open(fname, 'rb') as f: data = f.read() #except IOError: #data = bytes("<b>NO SUCH RESOURCE AVAILABLE</b>".encode('utf-8')) self.send_response(200) if self.path.endswith(".js"): self.send_header('Content-type', 'text/javascript') elif self.path.endswith(".css"): self.send_header("Content-type", "text/css") elif fname.endswith(".png"): self.send_header('Content-type', "image/png") elif fname.endswith(".gif"): self.send_header('Content-type', "image/gif") else: self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(data) return # ------------------------------------------------------------------ # GET requests on the specviewer service. if parms["path"].startswith("/specqueue.json"): self._handle_specqueue_json(spec_events, parms) return # ------------------------------------------------------------------ # Server time # Queried by metrics client elif parms["path"].startswith("/rqsite.json"): self.send_response(200) self.send_header('Content-type', "application/json") self.end_headers() tdic = server_time() self.wfile.write( bytes(json.dumps(tdic, sort_keys=True, indent=4).encode('utf-8')) ) # ------------------------------------------------------------------ # Queried by metrics client elif parms["path"].startswith("/rqlog.json"): self.send_response(200) self.send_header('Content-type', "application/json") self.end_headers() if "file" in parms: logfile = parms["file"][0] if not os.path.exists(logfile): msg = "Log file not available" else: f = open(logfile) msg = f.read() f.close() else: msg = "No log file available" tdic = {"log": msg} self.wfile.write( bytes(json.dumps(tdic, sort_keys=True, indent=4).encode('utf-8')) ) except OSError: self.send_error(404, 'File Not Found: {}'.format(self.path)) raise return
[docs] def do_POST(self): """ Here, HTTP POST requests are farmed out to either metrics events on the event_report/ service, or to spec_events on the spec_report/ service. """ mform = '"%s" %s %s' info_code = 203 size = "-" events = self.informers["events"] spec_events = self.informers["spec_events"] parms = parsepath(self.path) vlen = int(self.headers["Content-Length"]) pdict = self.rfile.read(vlen) # for QAP metrics ... if parms["path"].startswith("/event_report"): self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() aevent = json.loads(pdict) events.append_event(aevent) self.log_message(mform, "Appended event", info_code, size) self.log_message(mform, repr(aevent), info_code, size) elif parms["path"].startswith("/spec_report"): self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.log_message(mform, "ADCC recieved new event", info_code, size) spec_events.clear_list() aevent = json.loads(pdict) spec_events.append_event(aevent) self.log_message('"%s" %s %s', "Appended event", info_code, size) self.log_message('"%s" %s %s', repr(aevent), info_code, size) return
# ------------------------------------------------------------------------- # privitized handling cmdqueue.json requests def _handle_cmdqueue_json(self, events, parms): """ Handle HTTP client GET requests on service: cmdqueue.json """ verbosity = self.informers["verbose"] self.send_response(200) self.send_header('Content-type', "application/json") self.end_headers() # N.B. A timestamp of zero will request *everything* from fitsstore # This could be huge. Be careful passing GET request on cmdqueue.json # with no timestamp. try: fromtime = float(parms["timestamp"][0]) except KeyError: self.log_message(msg_form, ILLREQ, info_code, size) # event_list = [] implies new adcc. Request current OP day from fits. if not events.event_list: self.log_message(msg_form, "No extant events.", info_code, size) self.log_message(msg_form, REQMSG+"@fitsstore", info_code, size) events.event_list = fstore_get(current_op_timestamp()) tdic = events.get_list() self.log_message(msg_form, RECMSG.format(len(tdic)), info_code, size) tdic.insert(0, {"msgtype": "cmdqueue.request", "timestamp": time.time()}) tdic.append({"msgtype": "cmdqueue.request", "timestamp": time.time()}) self.wfile.write( bytes(json.dumps(tdic, sort_keys=True, indent=4).encode('utf-8')) ) # Handle current nighttime requests ... elif stamp_to_opday(fromtime) == stamp_to_opday(current_op_timestamp()): if verbosity: self.log_message(msg_form, REQMSG+stamp_to_opday(fromtime), info_code, size) tdic = events.get_list(fromtime=fromtime) tdic.insert(0, {"msgtype":"cmdqueue.request", "timestamp": time.time()}) self.wfile.write( bytes(json.dumps(tdic, sort_keys=True, indent=4).encode('utf-8')) ) # Handle previous day requests elif fromtime < current_op_timestamp(): if verbosity: self.log_message(msg_form, "Requested metrics on ... " + stamp_to_opday(fromtime), info_code, size) tdic = fstore_get(fromtime) if verbosity: self.log_message(msg_form, "Received " + str(len(tdic)) + " events from fitsstore.", info_code, size) # Append the last timestamp from the event_list. This is done # to trigger the client to pinging the adcc from the last # recorded event. tdic.insert(0, {"msgtype": "cmdqueue.request", "timestamp": time.time()}) tdic.append({"msgtype": "cmdqueue.request", "timestamp": time.time()}) self.wfile.write( bytes(json.dumps(tdic, sort_keys=True, indent=4).encode('utf-8')) ) # Cannot handle the future ... else: self.log_message(msg_form, "Invalid timestamp received.", fail_code, size) self.log_message(msg_form, "Future events not known.", fail_code, size) return # ------------------------------------------------------------------------- # privitized handling specview.json requests def _handle_specqueue_json(self, spec_events, parms): """ Handle HTTP client GET requests on service: specqueue.json """ specdic = list() verbosity = self.informers["verbose"] self.send_response(200) self.send_header('Content-type', "application/json") self.end_headers() if not spec_events.get_list(): self.log_message(msg_form, "No quicklook spectra.", info_code, size) specdic.insert(0, {"msgtype":"specqueue.request", "timestamp": time.time()}) specdic.append({"msgtype": "specqueue.request", "timestamp": time.time()}) self.wfile.write( bytes(json.dumps(specdic, sort_keys=True, indent=4).encode('utf-8'))) return splist = spec_events.get_list() specdic.extend(splist) specdic.insert(0, {"msgtype": "specqueue.request", "timestamp": time.time()}) specdic.append({"msgtype": "specqueue.request", "timestamp": time.time()}) print() print("Sending specdic object ...") self.wfile.write( bytes(json.dumps(specdic, sort_keys=True, indent=4).encode('utf-8')) ) #spec_events.clear_list() return
[docs] class MTHTTPServer(ThreadingMixIn, HTTPServer): """Handles requests using threads"""
[docs] def startInterfaceServer(*args, **informers): import socket run_event = args[0] port = informers['port'] ADCCHandler.informers = informers findingPort = True while findingPort: try: print("Starting HTTP server on port %s ... " % str(port)) server = MTHTTPServer(('', port), ADCCHandler) findingPort = False except OSError: print("failed, port taken") port += 1 print("Started HTTP server on port %s" % str(port)) print("Serving metrics on:\n\t /qap/nighttime_metrics.html") print("Serving 1D spectra on:\n\t/qapspec/specviewer.html") while run_event.is_set(): r, w, x = select.select([server.socket], [], [], .5) if r: server.handle_request() print("http_proxy: received signal 'clear'. Shutting down proxy server ...") server.socket.close() return
main = startInterfaceServer