Source code for sorunlib.seq

import datetime as dt
import time

import sorunlib as run

from sorunlib.commands import _timestamp_to_utc_datetime
from sorunlib._internal import check_response, check_started, monitor_process, protect_shutdown, stop_smurfs


OP_TIMEOUT = 60


@protect_shutdown
def _stop_scan():
    acu = run.CLIENTS['acu']

    print("Stopping scan.")
    stop_smurfs()

    # Stop motion
    acu.generate_scan.stop()
    print("Waiting for telescope motion to stop.")
    resp = acu.generate_scan.wait(timeout=OP_TIMEOUT)
    check_response(acu, resp)
    print("Scan finished.")


[docs] def scan(description, stop_time, width, az_drift=0, scan_type=1, el_amp=None, tag=None, subtype=None, min_duration=None, **kwargs): """Run a constant elevation scan, collecting detector data. Args: description (str): Description of the field/object being scanned. stop_time (str): Time in ISO format to scan until, i.e. "2022-06-21T15:58:00" width (float): Scan width in azimuth. The scan will start at the current position and move in the positive azimuth direction. az_drift (float): Drift velocity in deg/s, causing scan extrema to move accordingly. scan_type (int): Scan type. Possible values are 1, 2, or 3. el_amp (float): For type 3 scans, the amplitude (half peak-to-peak) for the elevation oscillation, in degrees. Must be specified if scan_type=3 (but can be zero). tag (str, optional): Tag or comma-separated listed of tags to attach to the operation. Passed through to the smurf stream command. subtype (str, optional): Operation subtype used to tag the stream. min_duration (float, optional): Minimum duration required to scan, specified in seconds. If not enough time exists between now and the ``stop_time`` the scan is not executed. Defaults to None. Any additional arguments are passed through to generate_scan. """ now = dt.datetime.now(dt.timezone.utc) scan_stop = _timestamp_to_utc_datetime(stop_time) # Check stop time has not already passed if now > scan_stop: return # Check there is enough time to perform scan if min_duration is not None: start_by_time = scan_stop - dt.timedelta(seconds=min_duration) if now > start_by_time: return # It is an error to not declare el_amp when you specify type 3 scan. assert (scan_type != 3 or el_amp is not None) acu = run.CLIENTS['acu'] try: # Enable SMuRF streams run.smurf.stream('on', subtype=subtype, tag=tag) # Grab current telescope position resp = acu.monitor.status() az = resp.session['data']['StatusDetailed']['Azimuth current position'] el = resp.session['data']['StatusDetailed']['Elevation current position'] if scan_type == 3: el1 = el - el_amp el2 = el + el_amp else: el1 = el2 = el # Start telescope motion # az_speed and az_accel assumed from ACU defaults # Can be modified by acu.set_scan_params() resp = acu.generate_scan.start(az_endpoint1=az, az_endpoint2=az + width, el_endpoint1=el1, el_endpoint2=el2, el_speed=0, az_drift=az_drift, scan_type=scan_type, **kwargs) check_started(acu, resp) # Wait until stop time monitor_process(acu, 'generate_scan', stop_time) finally: _stop_scan()
[docs] def el_nod(el1, el2, num=5, pause=5): """Perform a set of elevation nods. Elevation nods will be peformed at the current azimuth, and will start from and return to the current elevation. The nod first moves to ``el1``, pauses, then moves to ``el2``, pauses, and then repeats for the specified number of iterations. Args: el1 (float): First elevation to move to during the nod. el2 (float): Second elevation to move to during the nod. num (int): Number of nods to peform. Defaults to 5. pause (float): Length of pause, in seconds, at each elevation. Defaults to 5 seconds. """ acu = run.CLIENTS['acu'] try: # Enable SMuRF streams run.smurf.stream('on', subtype='cal', tag='el_nods') # Grab current telescope position resp = acu.monitor.status() init_az = resp.session['data']['StatusDetailed']['Azimuth current position'] init_el = resp.session['data']['StatusDetailed']['Elevation current position'] # Perform nods for x in range(num): run.acu.move_to(az=init_az, el=el1) time.sleep(pause) run.acu.move_to(az=init_az, el=el2) time.sleep(pause) else: # Return to initial position run.acu.move_to(az=init_az, el=init_el) finally: stop_smurfs()