Source code for zhinst.utils.shfqa.shfqa

"""Zurich Instruments LabOne Python API Utility functions for SHFQA."""

import time

import numpy as np
from zhinst.utils.utils import wait_for_state_change
from zhinst.ziPython import AwgModule, ziDAQServer, compile_seqc

SHFQA_MAX_SIGNAL_GENERATOR_WAVEFORM_LENGTH = 4 * 2**10
SHFQA_MAX_SIGNAL_GENERATOR_CARRIER_COUNT = 16
SHFQA_SAMPLING_FREQUENCY = 2e9


[docs]def max_qubits_per_channel(daq: ziDAQServer, device_id: str) -> int: """Returns the maximum number of supported qubits per channel. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. """ return len(daq.listNodes(f"/{device_id}/qachannels/0/readout/integration/weights"))
[docs]def load_sequencer_program( daq: ziDAQServer, device_id: str, channel_index: int, sequencer_program: str, *, awg_module: AwgModule = None, timeout: float = 10, ) -> None: """Compiles and loads a program to a specified sequencer. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying to which sequencer the program below is uploaded - there is one sequencer per channel. sequencer_program: Sequencer program to be uploaded. awg_module: The standalone AWG compiler is used instead. .. deprecated:: 22.08 timeout: Maximum time to wait for the compilation on the device in seconds. """ # start by resetting the sequencer daq.syncSetInt( f"/{device_id}/qachannels/{channel_index}/generator/reset", 1, ) wait_for_state_change( daq, f"/{device_id}/qachannels/{channel_index}/generator/ready", 0, timeout=timeout, ) device_type = daq.getString(f"/{device_id}/features/devtype") device_options = daq.getString(f"/{device_id}/features/options") elf, _ = compile_seqc( sequencer_program, device_type, device_options, channel_index, sequencer="qa" ) daq.setVector(f"/{device_id}/qachannels/{channel_index}/generator/elf/data", elf) # wait until the device becomes ready after program upload wait_for_state_change( daq, f"/{device_id}/qachannels/{channel_index}/generator/ready", 1, timeout=timeout, )
[docs]def configure_scope( daq: ziDAQServer, device_id: str, *, input_select: dict, num_samples: int, trigger_input: str, num_segments: int = 1, num_averages: int = 1, trigger_delay: float = 0.0, ) -> None: """Configures the scope for a measurement. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. input_select: Keys (int) map a specific scope channel with a signal source (str), e.g. "channel0_signal_input". For a list of available values use daq.help(f"/{device_id}/scopes/0/channels/0/inputselect"). num_samples: Number of samples in the scope shot. trigger_input: Specifies the trigger source of the scope acquisition - if set to None, the self-triggering mode of the scope becomes active, which is useful e.g. for the GUI. For a list of available trigger values use daq.help(f"/{device_id}/scopes/0/trigger/channel"). num_segments: Number of distinct scope shots to be returned after ending the acquisition. num_averages: Specifies how many times each segment should be averaged on hardware; to finish a scope acquisition, the number of issued triggers must be equal to num_segments * num_averages. trigger_delay: Delay in samples specifying the time between the start of data acquisition and reception of a trigger. """ scope_path = f"/{device_id}/scopes/0/" settings = [] settings.append((scope_path + "segments/count", num_segments)) if num_segments > 1: settings.append((scope_path + "segments/enable", 1)) else: settings.append((scope_path + "segments/enable", 0)) if num_averages > 1: settings.append((scope_path + "averaging/enable", 1)) else: settings.append((scope_path + "averaging/enable", 0)) settings.append((scope_path + "averaging/count", num_averages)) settings.append((scope_path + "channels/*/enable", 0)) for channel, selected_input in input_select.items(): settings.append( (scope_path + f"channels/{channel}/inputselect", selected_input) ) settings.append((scope_path + f"channels/{channel}/enable", 1)) settings.append((scope_path + "trigger/delay", trigger_delay)) if trigger_input is not None: settings.append((scope_path + "trigger/channel", trigger_input)) settings.append((scope_path + "trigger/enable", 1)) else: settings.append((scope_path + "trigger/enable", 0)) settings.append((scope_path + "length", num_samples)) daq.set(settings)
[docs]def get_scope_data(daq: ziDAQServer, device_id: str, *, timeout: float = 1.0) -> tuple: """Queries the scope for data once it is finished. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. timeout: Maximum time to wait for the scope data in seconds. Returns: Three-element tuple with: * recorded_data (array): Contains an array per scope channel with the recorded data. * recorded_data_range (array): Full scale range of each scope channel. * scope_time (array): Relative acquisition time for each point in recorded_data in seconds starting from 0. """ # wait until scope has been triggered wait_for_state_change(daq, f"/{device_id}/scopes/0/enable", 0, timeout=timeout) # read and post-process the recorded data recorded_data = [[], [], [], []] recorded_data_range = [0.0, 0.0, 0.0, 0.0] num_bits_of_adc = 14 max_adc_range = 2 ** (num_bits_of_adc - 1) channels = range(4) for channel in channels: if daq.getInt(f"/{device_id}/scopes/0/channels/{channel}/enable"): path = f"/{device_id}/scopes/0/channels/{channel}/wave" data = daq.get(path.lower(), flat=True) vector = data[path] recorded_data[channel] = vector[0]["vector"] averagecount = vector[0]["properties"]["averagecount"] scaling = vector[0]["properties"]["scaling"] voltage_per_lsb = scaling * averagecount recorded_data_range[channel] = voltage_per_lsb * max_adc_range # generate the time base scope_time = [[], [], [], []] decimation_rate = 2 ** daq.getInt(f"/{device_id}/scopes/0/time") sampling_rate = SHFQA_SAMPLING_FREQUENCY / decimation_rate # [Hz] for channel in channels: scope_time[channel] = ( np.array(range(0, len(recorded_data[channel]))) / sampling_rate ) return recorded_data, recorded_data_range, scope_time
[docs]def enable_sequencer( daq: ziDAQServer, device_id: str, channel_index: int, *, single: int ) -> None: """Starts the sequencer of a specific channel. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which sequencer to enable - there is one sequencer per channel. single: 1 - Disable sequencer after finishing execution. 0 - Restart sequencer after finishing execution. """ generator_path = f"/{device_id}/qachannels/{channel_index}/generator/" daq.setInt( generator_path + "single", single, ) daq.syncSetInt(generator_path + "enable", 1) hundred_milliseconds = 0.1 time.sleep(hundred_milliseconds)
[docs]def write_to_waveform_memory( daq: ziDAQServer, device_id: str, channel_index: int, waveforms: dict, *, clear_existing: bool = True, ) -> None: """Writes pulses to the waveform memory of a specified generator. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which generator the waveforms below are written to - there is one generator per channel. waveforms: Dictionary of waveforms, the key specifies the slot to which to write the value which is a complex array containing the waveform samples. clear_existing: Specify whether to clear the waveform memory before the present upload. """ generator_path = f"/{device_id}/qachannels/{channel_index}/generator/" if clear_existing: daq.syncSetInt(generator_path + "clearwave", 1) settings = [] for slot, waveform in waveforms.items(): settings.append((generator_path + f"waveforms/{slot}/wave", waveform)) daq.set(settings)
[docs]def start_continuous_sw_trigger( daq: ziDAQServer, device_id: str, *, num_triggers: int, wait_time: float ) -> None: """Issues a specified number of software triggers. Issues a specified number of software triggers with a certain wait time in between. The function guarantees reception and proper processing of all triggers by the device, but the time between triggers is non-deterministic by nature of software triggering. Warning: Only use this function for prototyping and/or cases without strong timing requirements. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. num_triggers: Number of triggers to be issued. wait_time: Time between triggers in seconds. """ min_wait_time = 0.02 wait_time = max(min_wait_time, wait_time) for _ in range(num_triggers): # syncSetInt() is a blocking call with non-deterministic execution time that # imposes a minimum time between two software triggers. daq.syncSetInt(f"/{device_id}/system/swtriggers/0/single", 1) time.sleep(wait_time)
[docs]def enable_scope( daq: ziDAQServer, device_id: str, *, single: int, acknowledge_timeout: float = 1.0 ) -> None: """Resets and enables the scope. Blocks until the host has received the enable acknowledgment from the device. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. single: 0 = continuous mode, 1 = single-shot. acknowledge_timeout: Maximum time to wait for diverse acknowledgments in the implementation. .. versionadded:: 0.1.1 """ daq.setInt(f"/{device_id}/scopes/0/single", single) path = f"/{device_id}/scopes/0/enable" if daq.getInt(path) == 1 and daq.syncSetInt(path, 0) != 0: raise RuntimeError( f"Failed to disable the scope for device {device_id} before enabling it." ) if daq.syncSetInt(path, 1) != 1: raise RuntimeError(f"The scope for device {device_id} could not be enabled")
[docs]def configure_weighted_integration( daq: ziDAQServer, device_id: str, channel_index: int, *, weights: dict, integration_delay: float = 0.0, clear_existing: bool = True, ) -> None: """Configures the weighted integration on a specified channel. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which group of integration units the integration weights should be uploaded to - each channel is associated with a number of integration units that depend on available device options. Please refer to the SHFQA manual for more details. weights: Dictionary containing the complex weight vectors, where keys correspond to the indices of the integration units to be configured. integration_delay: Delay in seconds before starting readout. clear_existing: Specify whether to set all the integration weights to zero before proceeding with the present upload. """ assert len(weights) > 0, "'weights' cannot be empty." integration_path = f"/{device_id}/qachannels/{channel_index}/readout/integration/" if clear_existing: daq.syncSetInt(integration_path + "clearweight", 1) settings = [] for integration_unit, weight in weights.items(): settings.append((integration_path + f"weights/{integration_unit}/wave", weight)) integration_length = len(weights[0]) settings.append((integration_path + "length", integration_length)) settings.append((integration_path + "delay", integration_delay)) daq.set(settings)
[docs]def configure_result_logger_for_spectroscopy( daq: ziDAQServer, device_id: str, channel_index: int, *, result_length: int, num_averages: int = 1, averaging_mode: int = 0, ) -> None: """Configures a specified result logger for spectroscopy mode. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which result logger to configure - there is one result logger per channel. result_length: Number of results to be returned by the result logger num_averages: Number of averages, will be rounded to 2^n. averaging_mode: Select the averaging order of the result, with 0 = cyclic and 1 = sequential. """ result_path = f"/{device_id}/qachannels/{channel_index}/spectroscopy/result/" settings = [] settings.append((result_path + "length", result_length)) settings.append((result_path + "averages", num_averages)) settings.append((result_path + "mode", averaging_mode)) daq.set(settings)
[docs]def configure_result_logger_for_readout( daq: ziDAQServer, device_id: str, channel_index: int, *, result_source: str, result_length: int, num_averages: int = 1, averaging_mode: int = 0, ) -> None: """Configures a specified result logger for readout mode. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which result logger to configure - there is one result logger per channel. result_source: String-based tag to select the result source in readout mode, e.g. "result_of_integration" or "result_of_discrimination". result_length: Number of results to be returned by the result logger. num_averages: Number of averages, will be rounded to 2^n. averaging_mode: Select the averaging order of the result, with 0 = cyclic and 1 = sequential. """ result_path = f"/{device_id}/qachannels/{channel_index}/readout/result/" settings = [] settings.append((result_path + "length", result_length)) settings.append((result_path + "averages", num_averages)) settings.append((result_path + "source", result_source)) settings.append((result_path + "mode", averaging_mode)) daq.set(settings)
[docs]def enable_result_logger( daq: ziDAQServer, device_id: str, channel_index: int, *, mode: str, acknowledge_timeout: float = 1.0, ) -> None: """Resets and enables a specified result logger. Blocks until the host has received the enable acknowledgment from the device. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which result logger to enable - there is one result logger per channel. mode: Select between "spectroscopy" and "readout" mode. acknowledge_timeout: Maximum time to wait for diverse acknowledgments in the implementation. .. versionadded:: 0.1.1 """ enable_path = f"/{device_id}/qachannels/{channel_index}/{mode}/result/enable" # reset the result logger if some old measurement is still running if daq.getInt(enable_path) == 1 and daq.syncSetInt(enable_path, 0) != 0: raise RuntimeError(f"Failed to disable the result logger for {mode} mode.") # enable the result logger if daq.syncSetInt(enable_path, 1) != 1: raise RuntimeError( f"Failed to enable the result logger for {mode} mode. " f"Please make sure that the QA channel mode is set to {mode}." )
[docs]def get_result_logger_data( daq: ziDAQServer, device_id: str, channel_index: int, *, mode: str, timeout: float = 1.0, ) -> np.array: """Return the measured data of a specified result logger. Blocks until the specified result logger is finished. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which result logger to query results from - there is one result logger per channel. mode: Select between "spectroscopy" and "readout" mode. timeout: Maximum time to wait for data in seconds. Returns: Array containing the result logger data. """ try: wait_for_state_change( daq, f"/{device_id}/qachannels/{channel_index}/{mode}/result/enable", 0, timeout=timeout, ) except TimeoutError as error: raise TimeoutError( "The result logger is still running. " "This usually indicates that it did not receive the expected number of " "triggers." ) from error data = daq.get( f"/{device_id}/qachannels/{channel_index}/{mode}/result/data/*/wave", flat=True, ) result = np.array([d[0]["vector"] for d in data.values()]) return result
[docs]def configure_channel( daq: ziDAQServer, device_id: str, channel_index: int, *, input_range: int, output_range: int, center_frequency: float, mode: str, ) -> None: """Configures the RF input and output of a specified channel. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying which channel to configure. input_range: Maximal range of the signal input power in dbM. output_range: Maximal range of the signal output power in dbM. center_frequency: Center Frequency of the analysis band. mode: Select between "spectroscopy" and "readout" mode. """ path = f"/{device_id}/qachannels/{channel_index}/" settings = [] settings.append((path + "input/range", input_range)) settings.append((path + "output/range", output_range)) settings.append((path + "centerfreq", center_frequency)) settings.append((path + "mode", mode)) daq.set(settings)
[docs]def configure_sequencer_triggering( daq: ziDAQServer, device_id: str, channel_index: int, *, aux_trigger: str, play_pulse_delay: float = 0.0, ) -> None: """Configures the triggering of a specified sequencer. Args: daq: Instance of a Zurich Instruments API session connected to a Data Server. The device with identifier device_id is assumed to already be connected to this instance. device_id: SHFQA device identifier, e.g. `dev12004` or 'shf-dev12004'. channel_index: Index specifying on which sequencer to configure the triggering - there is one sequencer per channel. aux_trigger: Alias for the trigger used in the sequencer. For a list of available values use. daq.help(f"/{device_id}/qachannels/0/generator/auxtriggers/0/channel") play_pulse_delay: Delay in seconds before the start of waveform playback. """ daq.setString( f"/{device_id}/qachannels/{channel_index}/generator/auxtriggers/0/channel", aux_trigger, ) daq.setDouble( f"/{device_id}/qachannels/{channel_index}/generator/delay", play_pulse_delay, )