Source code for zhinst.toolkit.driver.devices.pqsc

"""PQSC Instrument Driver."""

import logging
import time
from typing import List, Union

from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.exceptions import ToolkitError


logger = logging.getLogger(__name__)


[docs]class PQSC(BaseInstrument): """High-level driver for the Zurich Instruments PQSC."""
[docs] def arm(self, *, deep=True, repetitions: int = None, holdoff: float = None) -> None: """Prepare PQSC for triggering the instruments. This method configures the execution engine of the PQSC and clears the register bank. Optionally, the *number of triggers* and *hold-off time* can be set when specified as keyword arguments. If they are not specified, they are not changed. Note that the PQSC is disabled at the end of the hold-off time after sending out the last trigger. Therefore, the hold-off time should be long enough such that the PQSC is still enabled when the feedback arrives. Otherwise, the feedback cannot be processed. Args: deep: A flag that specifies if a synchronization should be performed between the device and the data server after stopping the PQSC and clearing the register bank (default: True). repetitions: If specified, the number of triggers sent over ZSync ports will be set (default: None). holdoff: If specified, the time between repeated triggers sent over ZSync ports will be set. It has a minimum value and a granularity of 100 ns (default: None). """ # Stop the PQSC if it is already running self.stop(deep=deep) if repetitions is not None: self.execution.repetitions(repetitions) if holdoff is not None: self.execution.holdoff(holdoff) # Clear register bank self.feedback.registerbank.reset(1, deep=deep)
[docs] def run(self, *, deep: bool = True) -> None: """Start sending out triggers. This method activates the trigger generation to trigger all connected instruments over ZSync ports. Args: deep: A flag that specifies if a synchronization should be performed between the device and the data server after enabling the PQSC (default: True). """ self.execution.enable(True, deep=deep)
[docs] def arm_and_run(self, *, repetitions: int = None, holdoff: float = None) -> None: """Arm the PQSC and start sending out triggers. Simply combines the methods arm and run. A synchronization is performed between the device and the data server after arming and running the PQSC. Args: repetitions: If specified, the number of triggers sent over ZSync ports will be set (default: None). holdoff: If specified, the time between repeated triggers sent over ZSync ports will be set. It has a minimum value and a granularity of 100 ns (default: None). """ self.arm(deep=True, repetitions=repetitions, holdoff=holdoff) self.run(deep=True)
[docs] def stop(self, *, deep: bool = True) -> None: """Stop the trigger generation. Args: deep: A flag that specifies if a synchronization should be performed between the device and the data server after disabling the PQSC (default: True). """ self.execution.enable(False, deep=deep)
[docs] def wait_done(self, *, timeout: float = 10, sleep_time: float = 0.005) -> None: """Wait until trigger generation and feedback processing is done. Args: timeout: The maximum waiting time in seconds for the PQSC (default: 10). sleep_time: Time in seconds to wait between requesting PQSC state Raises: TimeoutError: If the PQSC is not done sending out all triggers and processing feedback before the timeout. """ try: self.execution.enable.wait_for_state_change( 0, timeout=timeout, sleep_time=sleep_time ) except TimeoutError as error: raise TimeoutError("PQSC timed out.") from error
[docs] def check_ref_clock(self, *, timeout: int = 30, sleep_time: int = 1) -> bool: """Check if reference clock is locked successfully. Args: timeout: Maximum time in seconds the program waits (default: 30). sleep_time: Time in seconds to wait between requesting the reference clock status (default: 1) Raises: TimeoutError: If the process of locking to the reference clock exceeds the specified timeout. """ ref_clock_status = self.system.clocks.referenceclock.in_.status ref_clock = self.system.clocks.referenceclock.in_.source ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual try: ref_clock_status.wait_for_state_change( 2, invert=True, timeout=timeout, sleep_time=sleep_time ) except TimeoutError as error: raise TimeoutError( "Timeout during locking to reference clock signal" ) from error if ref_clock_status() == 0: return True if ref_clock_status() == 1 and ref_clock_actual() != ref_clock(): ref_clock("internal", deep=True) logger.error( f"There was an error locking the device({self.serial}) " f"onto reference clock signal. Automatically switching to internal " f"reference clock. Please try again." ) return False
[docs] def check_zsync_connection( self, ports: Union[List[int], int] = 0, *, timeout: int = 30, sleep_time: int = 1, ) -> Union[List[bool], bool]: """Check if the ZSync connection on the given port(s) is established. This function checks the current status of the instrument connected to the given ports. Args: ports: The port numbers to check the ZSync connection for. It can either be a single port number given as integer or a list of several port numbers. (default: 0) timeout: Maximum time in seconds the program waits (default: 30). sleep_time: Time in seconds to wait between requesting the reference clock status (default: 1) Raises: TimeoutError: If the process of establishing a ZSync connection on one of the specified ports exceeds the specified timeout. """ ports_list = ports if isinstance(ports, list) else [ports] start_time = time.time() status = [] for port in ports_list: status.append( self._check_zsync_connection( port, timeout=max(0, timeout - (time.time() - start_time)), sleep_time=sleep_time, ) ) return status if isinstance(ports, list) else status[0]
def _check_zsync_connection( self, port: int, timeout: float, sleep_time: float ) -> bool: """Check if the ZSync connection on the given port is successful. This function checks the current status of the instrument connected to the given port. Args: ports: Port number to check the ZSync connection for. timeout: Maximum time in seconds the program waits (default: 30). sleep_time: Time in seconds to wait between requesting the status (default: 1) Raises: TimeoutError: If the process of establishing a ZSync connection the specified port exceeds the specified timeout. """ status_node = self.zsyncs[port].connection.status start_time = time.time() try: status_node.wait_for_state_change( 0, invert=True, timeout=timeout, sleep_time=sleep_time ) status_node.wait_for_state_change( 1, invert=True, timeout=max(0, timeout - (time.time() - start_time)), sleep_time=sleep_time, ) except TimeoutError as error: raise TimeoutError( "Timeout while establishing ZSync connection to the instrument " f"on the port {port}." ) from error return status_node() == 2
[docs] def find_zsync_worker_port(self, device: BaseInstrument) -> int: """Find the ID of the PQSC ZSync port connected to a given device. Args: pqsc: PQSC device over whose ports the research shall be done. device: device for which the connected ZSync port shall be found. Returns: Integer value represent the ID of the searched PQSC Zsync port. Raises: ToolkitError: If the given device doesn't appear to be connected to the PQSC via ZSync. .. versionadded:: 0.5.1 """ device_serial = device.serial[3:] node_to_serial_dict = self.zsyncs["*"].connection.serial() serial_to_node_dict = { serial: node for node, serial in node_to_serial_dict.items() } # Get the node of the ZSync connected to the device # (will have the form "/devXXXX/zsyncs/N/connection/serial") try: device_zsync_node = serial_to_node_dict[device_serial] except KeyError: raise ToolkitError( "No ZSync connection found between the PQSC " f"{self.serial} and the device {device.serial}." ) # Just interested in knowing N: split in # ['', 'devXXXX', 'zsyncs', 'N', 'connection', 'serial'] # and take fourth value return int(device_zsync_node.split("/")[3])