import sorunlib as run
from sorunlib._internal import check_response, stop_smurfs
def _get_direction():
"""Get the rotational direction ('cw' or 'ccw') of the HWP. The direction
is determined in part by the configuration of the HWP supervisor agent. For
details see the `HWP Supervisor agent docs <docs_>`_.
* 'cw' is clockwise as seen from the sky to window.
* 'ccw' is counter-clockwise as seen from the sky to window.
Returns:
str: The direction of the HWP, either 'cw' or 'ccw'.
Raises:
RuntimeError: If the direction is not either 'cw' or 'ccw'.
.. _docs: https://socs.readthedocs.io/en/main/agents/hwp_supervisor_agent.html
"""
hwp = run.CLIENTS['hwp']
resp = hwp.monitor.status()
direction = resp.session['data']['hwp_state']['direction']
if direction not in ['cw', 'ccw']:
raise RuntimeError("The HWP direction is unknown. Aborting...")
return direction
# Public API
[docs]
def set_freq(freq, timeout=None):
"""Set the rotational frequency of the HWP.
Args:
freq (float): Target frequency to rotate the HWP in Hz. This is a
*signed float*, the meaning of which depends on the OCS site
configuration. For details see the `documentation for the HWP
Supervisor Agent <docs_>`_.
timeout (float, optional): Duration, in seconds, to wait for the
operation to complete. An exception will be raised if this timeout
is exceeded.
.. _docs: https://socs.readthedocs.io/en/main/agents/hwp_supervisor_agent.html
"""
hwp = run.CLIENTS['hwp']
if timeout is None:
resp = hwp.pid_to_freq(target_freq=freq)
else:
hwp.pid_to_freq.start(target_freq=freq)
resp = hwp.pid_to_freq.wait(timeout=timeout)
check_response(hwp, resp)
[docs]
def spin_up(freq):
"""Spin up the HWP while streaming data.
Args:
freq (float): Target frequency to rotate the HWP in Hz. This is a
*signed float*, the meaning of which depends on the OCS site
configuration. For details see the `documentation for the HWP
Supervisor Agent <docs_>`_.
.. _docs: https://socs.readthedocs.io/en/main/agents/hwp_supervisor_agent.html
"""
hwp = run.CLIENTS['hwp']
try:
run.smurf.stream('on', subtype='cal', tag='hwp_spin_up')
resp = hwp.enable_driver_board()
check_response(hwp, resp)
run.hwp.set_freq(freq=freq, timeout=1800)
finally:
stop_smurfs()
[docs]
def spin_down(active=True, brake_voltage=None):
"""Spin down the HWP while streaming data.
Args:
active (bool, optional): If True, actively try to stop the HWP by
applying the brake. If False, simply turn off the PMX power and wait
for it to spin down on its own. Defaults to True.
brake_voltage (float, optional): Voltage used when actively stopping
the HWP. Only considered when active is True.
"""
hwp = run.CLIENTS['hwp']
try:
run.smurf.stream('on', subtype='cal', tag='hwp_spin_down')
run.hwp.stop(active=active,
brake_voltage=brake_voltage)
resp = hwp.disable_driver_board()
check_response(hwp, resp)
finally:
stop_smurfs()
[docs]
def stop(active=True, brake_voltage=None):
"""Stop the HWP.
Args:
active (bool, optional): If True, actively try to stop the HWP by
applying the brake. If False, simply turn off the PMX power and wait
for it to spin down on its own. Defaults to True.
brake_voltage (float, optional): Voltage used when actively stopping
the HWP. Only considered when active is True.
"""
hwp = run.CLIENTS['hwp']
print('Stopping HWP and waiting for it to spin down.')
if active:
if brake_voltage is None:
resp = hwp.brake()
else:
resp = hwp.brake(brake_voltage=brake_voltage)
check_response(hwp, resp)
else:
resp = hwp.pmx_off(wait_stop=True)
check_response(hwp, resp)