Source code for zhinst.toolkit.driver.devices.quantum_system_hub

"""PQSC and QHub base Instrument Driver."""

import logging
import time
from typing import List, Union

from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.exceptions import ToolkitError


logger = logging.getLogger(__name__)


[docs]class QuantumSystemHub(BaseInstrument): """Base driver for the Zurich Instruments PQSC and QHub. This class should not be instantiated directly, but instead through PQSC or QHub """
[docs] def run(self, *, deep: bool = True) -> None: """Start sending out triggers. This method activates the trigger generation to trigger all connected instruments over ZSync ports. Args: deep: A flag that specifies if a synchronization should be performed between the device and the data server after enabling the PQSC/QHub (default: True). """ self.execution.enable(True, deep=deep)
[docs] def arm_and_run(self, *, repetitions: int = None, holdoff: float = None) -> None: """Arm the PQSC/QHub and start sending out triggers. Simply combines the methods arm and run. A synchronization is performed between the device and the data server after arming and running the PQSC/QHub. Args: repetitions: If specified, the number of triggers sent over ZSync ports will be set (default: None). holdoff: If specified, the time between repeated triggers sent over ZSync ports will be set. It has a minimum value and a granularity of 100 ns (default: None). """ self.arm(deep=True, repetitions=repetitions, holdoff=holdoff) self.run(deep=True)
[docs] def stop(self, *, deep: bool = True) -> None: """Stop the trigger generation. Args: deep: A flag that specifies if a synchronization should be performed between the device and the data server after disabling the PQSC/QHub (default: True). """ self.execution.enable(False, deep=deep)
[docs] def wait_done(self, *, timeout: float = 10.0, sleep_time: float = 0.005) -> None: """Wait until trigger generation and feedback processing is done. Args: timeout: The maximum waiting time in seconds for the PQSC/QHub (default: 10.0). sleep_time: Time in seconds to wait between requesting PQSC/QHub state Raises: TimeoutError: If the PQSC/QHub is not done sending out all triggers and processing feedback before the timeout. """ try: self.execution.enable.wait_for_state_change( 0, timeout=timeout, sleep_time=sleep_time ) except TimeoutError as error: raise TimeoutError(f"{self.__class__.__name__:s} timed out.") from error
[docs] def check_ref_clock( self, *, timeout: float = 30.0, sleep_time: float = 1.0 ) -> bool: """Check if reference clock is locked successfully. Args: timeout: Maximum time in seconds the program waits (default: 30.0). sleep_time: Time in seconds to wait between requesting the reference clock status (default: 1) Raises: TimeoutError: If the process of locking to the reference clock exceeds the specified timeout. """ ref_clock_status = self.system.clocks.referenceclock.in_.status ref_clock = self.system.clocks.referenceclock.in_.source ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual try: ref_clock_status.wait_for_state_change( 2, invert=True, timeout=timeout, sleep_time=sleep_time ) except TimeoutError as error: raise TimeoutError( "Timeout during locking to reference clock signal" ) from error if ref_clock_status() == 0: return True if ref_clock_status() == 1 and ref_clock_actual() != ref_clock(): ref_clock("internal", deep=True) logger.error( f"There was an error locking the device({self.serial}) " f"onto reference clock signal. Automatically switching to internal " f"reference clock. Please try again." ) return False
[docs] def check_zsync_connection( self, inputs: Union[List[int], int, List[BaseInstrument], BaseInstrument], *, timeout: float = 10.0, sleep_time: float = 0.1, ) -> Union[List[bool], bool]: """Check if a ZSync connection is established. Checks the current status of the instrument connected to the given ports. If a instrument(s) is given instead of a port number, first finds the correct port number(s). Args: inputs: The port numbers to check the ZSync connection for. It can either be a single port number given as integer, a list of several port numbers an instrument or a list of instruments. timeout: Maximum time in seconds the program waits (default: 10.0). sleep_time: Time in seconds to wait between requesting the reference clock status (default: 0.1) .. versionchanged:: 0.6.1: Reduce default timeout and sleep_time. .. versionchanged:: 0.6.1: Raise an error if the port is in a faulty state, instead of return False. Raises: TimeoutError: If the process of establishing a ZSync connection on one of the specified ports exceeds the specified timeout. """ inputs_list = inputs if isinstance(inputs, list) else [inputs] start_time = time.time() # Check the status of all ports status = [] for input in inputs_list: # Convert the instrument into a port, if needed if isinstance(input, BaseInstrument): port = self.find_zsync_worker_port( input, timeout=max(0, timeout - (time.time() - start_time)), sleep_time=sleep_time, ) else: port = input # Check or wait until the connection is ready status.append( self._check_zsync_connection( port, timeout=max(0, timeout - (time.time() - start_time)), sleep_time=sleep_time, ) ) return status if isinstance(inputs, list) else status[0]
def _check_zsync_connection( self, port: int, timeout: float, sleep_time: float ) -> bool: """Check if the ZSync connection on the given port is successful. This function checks the current status of the instrument connected to the given port. Args: ports: Port number to check the ZSync connection for. timeout: Maximum time in seconds the program waits. sleep_time: Time in seconds to wait between requesting the status Raises: TimeoutError: If the process of establishing a ZSync connection the specified port exceeds the specified timeout. """ status_node = self.zsyncs[port].connection.status try: # Waits until the status node is "connected" (2) status_node.wait_for_state_change(2, timeout=timeout, sleep_time=sleep_time) except TimeoutError as error: status = status_node() err_msg = ( "Timeout while establishing ZSync connection to the instrument " f"on the port {port}." ) if status == 0: # No connection err_msg += "No instrument detected." elif status == 1: # In progress err_msg += ( "Connection still in progress. Consider increasing the timeout." ) elif status == 3: # Error err_msg += ( "Impossible to establish a connect. Check cabling and FW version" ) raise TimeoutError(err_msg) from error return True
[docs] def find_zsync_worker_port( self, device: BaseInstrument, timeout: float = 10.0, sleep_time: float = 0.1, ) -> int: """Find the ID of the PQSC/QHub ZSync port connected to a given device. The function checks until the given timeout for the specified device to show up in the connection list. Args: device: device for which the connected ZSync port shall be found. timeout: Maximum time in seconds the program waits (default: 10.0). sleep_time: Time in seconds to wait between requesting the port serials list (default: 0.1) .. versionchanged:: 0.6.1: Added timeout and sleep_time parameters. Returns: Index of the searched PQSC/QHub ZSync port. Raises: ToolkitError: If the given device doesn't appear to be connected to the PQSC/QHub via ZSync. .. versionadded:: 0.5.1 """ device_serial = device.serial[3:] start = time.time() while time.time() - start < timeout: node_to_serial_dict = self.zsyncs["*"].connection.serial() if device_serial in node_to_serial_dict.values(): break time.sleep(sleep_time) else: raise ToolkitError( f"No ZSync connection found between {self.__class__.__name__:s}" f"{self.serial:s} and the device {device.serial:s}." ) # Get the node of the ZSync connected to the device # (will have the form "/devXXXX/zsyncs/N/connection/serial") serial_to_node_dict = { serial: node for node, serial in node_to_serial_dict.items() } device_zsync_node = serial_to_node_dict[device_serial] # Just interested in knowing N: split in # ['', 'devXXXX', 'zsyncs', 'N', 'connection', 'serial'] # and take fourth value return int(device_zsync_node.split("/")[3])