"""Class for frequency sweeps on SHFQA
"""
# Copyright 2022 Zurich Instruments AG
from collections import namedtuple
from dataclasses import dataclass
from enum import Enum, auto
import time
import textwrap
import numpy as np
from zhinst.utils import utils
from zhinst.ziPython import compile_seqc
class _Mapping(Enum):
LIN = "linear"
LOG = "log"
class _AveragingMode(Enum):
CYCLIC = "cyclic"
SEQUENTIAL = "sequential"
class _TriggerSource(Enum):
"""
Valid trigger sources for spectroscopy
Note: the user should write the trigger selection in lowercase letters.
e.g. "software_trigger0". The strings are transformed to uppercase only
for this enum, which is needed to distinguish between internal and external
triggers (see _EXTERNAL_TRIGGER_LIMIT).
"""
CHANNEL0_TRIGGER_INPUT0 = 0 # Important: start counting with 0
CHAN0TRIGIN0 = CHANNEL0_TRIGGER_INPUT0
CHANNEL0_TRIGGER_INPUT1 = auto()
CHAN0TRIGIN1 = CHANNEL0_TRIGGER_INPUT1
CHANNEL1_TRIGGER_INPUT0 = auto()
CHAN1TRIGIN0 = CHANNEL1_TRIGGER_INPUT0
CHANNEL1_TRIGGER_INPUT1 = auto()
CHAN1TRIGIN1 = CHANNEL1_TRIGGER_INPUT1
CHANNEL2_TRIGGER_INPUT0 = auto()
CHAN2TRIGIN0 = CHANNEL2_TRIGGER_INPUT0
CHANNEL2_TRIGGER_INPUT1 = auto()
CHAN2TRIGIN1 = CHANNEL2_TRIGGER_INPUT1
CHANNEL3_TRIGGER_INPUT0 = auto()
CHAN3TRIGIN0 = CHANNEL3_TRIGGER_INPUT0
CHANNEL3_TRIGGER_INPUT1 = auto()
CHAN3TRIGIN1 = CHANNEL3_TRIGGER_INPUT1
CHANNEL0_SEQUENCER_TRIGGER0 = auto()
CHAN0SEQTRIG0 = CHANNEL0_SEQUENCER_TRIGGER0
CHANNEL1_SEQUENCER_TRIGGER0 = auto()
CHAN1SEQTRIG0 = CHANNEL1_SEQUENCER_TRIGGER0
CHANNEL2_SEQUENCER_TRIGGER0 = auto()
CHAN2SEQTRIG0 = CHANNEL2_SEQUENCER_TRIGGER0
CHANNEL3_SEQUENCER_TRIGGER0 = auto()
CHAN3SEQTRIG0 = CHANNEL3_SEQUENCER_TRIGGER0
SOFTWARE_TRIGGER0 = auto()
SWTRIG0 = SOFTWARE_TRIGGER0
_EXTERNAL_TRIGGER_LIMIT = _TriggerSource.CHANNEL3_TRIGGER_INPUT1
def _check_trigger_source(trigger):
"""
Checks whether the trigger source exists in the _TriggerSource enumeration
Raises a ValueError exception if the checked setting was invalid.
Arguments:
trigger: the trigger source setting to be checked
"""
try:
_TriggerSource[trigger.upper()]
except ValueError:
print(
(
"Trigger source needs to be 'channel[0,3]_trigger_input[0,1]', "
"'channel[0,3]_sequencer_trigger0' or 'software_trigger0'."
)
)
def _check_channel_index(daq, device_id, channel_index):
"""
Checks whether the provided channel index is valid
Raises a ValueError exception if the checked setting was invalid.
Arguments:
channel_index: index of the qachannel to be checked
"""
device_type = daq.getString(f"/{device_id}/features/devtype")
if device_type == "SHFQA4":
num_qa_channels = 4
elif device_type == "SHFQA2":
num_qa_channels = 2
else:
# SHFQC
num_qa_channels = 1
if channel_index >= num_qa_channels:
raise ValueError(
f"Device {device_id} only has a total of {num_qa_channels} QA channels."
)
def _check_center_freq(center_freq_hz):
"""
Checks whether the center frequency is within the valid range
Raises a ValueError exception if the checked setting was invalid.
Arguments:
center_freq_hz: the center frequency to be checked in units Hz
"""
min_center_freq = 0
max_center_freq = 8e9
center_freq_steps = 100e6
rounding_error = 0.1
if center_freq_hz < min_center_freq:
raise ValueError(f"Center frequency must be greater than {min_center_freq}Hz.")
if center_freq_hz > max_center_freq:
raise ValueError(f"Center frequency must be less than {max_center_freq}Hz.")
if center_freq_hz % center_freq_steps > rounding_error:
raise ValueError(f"Center frequency must be multiple of {center_freq_steps}Hz.")
def _check_in_band_freq(start_freq, stop_freq):
"""
Checks whether the start/stop frequency for the in-band sweep is in the valid
range
Raises a ValueError exception if the checked setting was invalid.
Arguments:
start_freq:
stop_freq:
"""
min_offset_freq = -1e9
max_offset_freq = 1e9
if start_freq >= stop_freq:
raise ValueError("Stop frequency must be larger than start_freq frequency.")
if start_freq < min_offset_freq:
raise ValueError(f"Start frequency must be greater than {min_offset_freq}Hz.")
if stop_freq > max_offset_freq:
raise ValueError(f"Stop frequency must be less than {max_offset_freq}Hz.")
def _check_io_range(range_dbm, min_range):
"""
Checks whether the supplied input or output range setting is within the device
boundaries
Raises a ValueError exception if the checked setting was invalid.
Arguments:
range_dbm: the range setting to be checked in units of dBm
min_range: lower boundary
"""
max_range = 10
range_step = 5
rounding_error = 0.001
if range_dbm > max_range + rounding_error:
raise ValueError(f"Maximum range is {max_range}dBm.")
if range_dbm < min_range - rounding_error:
raise ValueError(f"Minimum range is {min_range}dBm.")
if range_dbm % range_step > rounding_error:
raise ValueError(f"Range must be multiple of {range_step}dBm.")
def _check_output_range(range_dbm):
"""
Checks whether the supplied output range setting is within the device boundaries
Raises a ValueError exception if the checked setting was invalid.
Arguments:
range_dbm: the range setting to be checked in units of dBm
"""
min_range_output = -30
_check_io_range(range_dbm, min_range_output)
def _check_input_range(range_dbm):
"""
Checks whether the supplied output range setting is within the device boundaries
Raises a ValueError exception if the checked setting was invalid.
Arguments:
range_dbm: the range setting to be checked in units of dBm
"""
min_range_input = -50
_check_io_range(range_dbm, min_range_input)
def _check_output_gain(gain):
"""
Checks whether the supplied output gain setting is within the device boundaries
Raises a ValueError exception if the checked setting was invalid.
Arguments:
gain: the gain setting to be checked
"""
max_gain = 1
min_gain = 0
if gain < min_gain or gain > max_gain:
raise ValueError(f"Output gain must be within [{min_gain}, {max_gain}].")
def _check_envelope_waveform(wave_vector):
"""
Checks whether the suplied vector is a valid envelope waveform
Raises a ValueError exception if the checked setting was invalid.
Arguments:
wave_vector: the waveform vector to be checked
"""
if wave_vector is None:
raise ValueError("No envelope waveform specified.")
max_envelope_length = 2**16
if len(wave_vector) > max_envelope_length:
raise ValueError(
f"Envelope length exceeds maximum of {max_envelope_length} samples."
)
# Note: here, we check that the envelope vector elements are within the unit
# circle. This check is repeated by the envelope/wave node but it is
# stated here explicitly as a guidance to the user.
if np.any(np.abs(wave_vector) > 1.0):
raise ValueError(
"The absolute value of each envelope vector element must be smaller "
"than 1."
)
def _check_mapping(mapping):
"""
Checks whether the suplied mapping is a valid setting
Raises a ValueError exception if the checked setting was invalid.
Arguments:
mapping: the setting to be checked
"""
try:
_Mapping(mapping.lower())
except ValueError:
print("Mapping needs to be 'linear' or 'log'.")
def _check_avg_mode(mode):
"""
Checks whether the average mode is a valid setting
Raises a ValueError exception if the checked setting was invalid.
Arguments:
mode: the setting to be checked
"""
try:
_AveragingMode(mode.lower())
except ValueError:
print("Averaging mode needs to be 'cyclic' or 'sequential'.")
def _print_sweep_progress(current, total, freq, newline=False):
"""
Prints a line indicating the sweep progress
Arguments:
current: the current number of measurements
total: the total number of measurements
freq: the current frequency
newline: specifies whether to print a newline (True)
or else a carriage return (False) at the end of the line
"""
print(
f"Measurement ({current}/{total}) at {(freq / 1e6):.3f}MHz." + " " * 20,
end=("\r" if not newline else "\n"),
)
def _round_for_playzero(time_interval: float, sample_rate: float):
"""
Rounds a time interval to the granularity of the playZero SeqC command
Arguments:
time_interval: the time interval to be rounded for the playZero command
sample_rate: the sample rate of the instrument
Returns:
rounded the time interval
"""
playzero_granularity = 16
# round up the number of samples to multiples of playzero_granularity
num_samples = (
((round(time_interval * sample_rate) + (playzero_granularity - 1)))
// playzero_granularity
) * playzero_granularity
return num_samples / sample_rate
def _is_subscribed(daq, node_path: str) -> bool:
"""
Checks whether the daq instance is subscribed to a given node or not
Arguments:
daq (ziDAQServer): an instance of the ziPython.ziDAQServer class
node_path: the path of the node to be checked
Returns:
True if the node is subscribed, False if not
"""
# NOTE: currently, daq.listNodes will not respect the subscribedonly flag when the
# node path does not contain a wildcard. Thus we work around this problem by
# determining the base path of the node and using a wildcard.
# Remove this workaround once the underlying bug L1-864 is fixed.
wildcard_path = "/".join(node_path.split("/")[:-1]) + "/*"
listed_nodes = daq.listNodes(wildcard_path, subscribedonly=True)
return node_path in listed_nodes
def _subscribe_with_assert(daq, node_path: str) -> bool:
"""
Subscribes to a node only if it was not already subscribed
Raises an AssertionError if the node was already subscribed
Arguments:
daq (ziDAQServer): an instance of the ziPython.ziDAQServer class
node_path: the path of the node to be checked
"""
assert not _is_subscribed(daq, node_path), (
"The following node was already subscribed:\n"
+ node_path
+ "\n"
+ "This would lead to unexpected behavior!"
)
daq.subscribe(node_path)
[docs]@dataclass
class SweepConfig:
"""Frequency range settings for a sweep"""
start_freq: float = -300e6 #: minimum frequency for the sweep
stop_freq: float = 300e6 #: maximum frequency for the sweep
num_points: int = 100 #: number of frequency points to measure
mapping: str = "linear" #: linear or logarithmic frequency axis
oscillator_gain: float = 1 #: amplitude gain for the oscillator used for modulation
settling_time: float = 200e-9
"""time to wait to ensure new frequency took effect in the device under test"""
use_sequencer: bool = True
"""specify whether to use the fast sequencer-based sweep (True) or the slower
host-driven sweep (False)"""
[docs]@dataclass
class RfConfig:
"""RF in- and ouput settings for a sweep"""
channel: int = 0 #: device channel to be used
input_range: int = -5 #: maximal Range of the Signal Input power
output_range: int = 0 #: maximal Range of the Signal Output power
center_freq: float = 5e9 #: Center Frequency of the analysis band
[docs]@dataclass
class AvgConfig:
"""Averaging settings for a sweep"""
integration_time: float = 1e-3 #: total time while samples are integrated
num_averages: int = 1 #: times to measure each frequency point
mode: str = "cyclic"
"""averaging mode, which can be "cyclic", to first scan the frequency and then
repeat, or "sequential", to average each point before changing the frequency"""
integration_delay: float = 0.0
"""time delay after the trigger for the integrator to start"""
[docs]@dataclass
class TriggerConfig:
"""Settings for the trigger"""
source: str = None
"""trigger source. Please refer to the node documentation in the user manual under
/DEV.../QACHANNELS/n/GENERATOR/AUXTRIGGERS/n/CHANNEL for a list of possible sources.
The default source (None) means the repetition rate of the experiment will be
determined by the sequencer using the integration time in AvgConfig and settling
time in SweepConfig.
Further note that the software trigger is not supported for the sequencer-based
sweeps (exception see force_sw_trigger)!"""
level: float = 0.5 #: trigger level
imp50: bool = True #: trigger input impedance - 50 Ohm if True; else high impedance
force_sw_trigger: bool = False
"""if True, the sequencer program waits for the software trigger even in
sequencer-based mode. Note, however, that the ShfSweeper python class will not
generate the software trigger on its own. Thus this mode is only useful if a
separate API session issues the software triggers!"""
[docs]@dataclass
class EnvelopeConfig:
"""Settings for defining a complex envelope for pulsed spectroscopy"""
waveform: np.complex128 = None #: the complex envelope waveform vector
delay: float = 0.0 #: time delay the waveform is generated after the trigger
Config = namedtuple("Config", ["sweep", "avg", "rf", "trig"])
# pylint: disable=too-many-instance-attributes
[docs]class ShfSweeper:
"""
Class to set up and run a sweep on an SHFQA
Arguments:
daq (zhinst.ziPython.ziDAQServer):
ziDAQServer object to communicate with a Zurich Instruments data server
dev (str):
The ID of the device to run the sweeper with. For example, `dev12004`.
"""
[docs] def __init__(self, daq, dev):
self._daq = daq
self._dev = dev
self._sweep = SweepConfig()
self._rf = RfConfig()
self._avg = AvgConfig()
self._trig = TriggerConfig()
# the envelope multiplication is enabled if and only if this member is not None
self._envelope = None
self._shf_sample_rate = 2e9
self._result = []
[docs] def run(self):
"""
Perform a sweep with the specified settings.
WARNING: During the sweep the following nodes are subscribed and the sync
command is used to clear all buffers on the data server before the measurement:
/{dev}/qachannels/{rf.channel}/spectroscopy/result/acquired
/{dev}/qachannels/{rf.channel}/spectroscopy/result/data/wave
Returns:
a dictionary with measurement data of the sweep
"""
self._init_sweep()
self._run_freq_sweep()
return self.get_result()
[docs] def get_result(self):
"""
Returns:
a dictionary with measurement data of the last sweep
"""
data = self._get_result_logger_data()
vec = self._result
if not self._sweep.use_sequencer:
vec = self._average_samples(vec)
data["vector"] = vec
props = data["properties"]
props["centerfreq"] = self._rf.center_freq
props["startfreq"] = self._sweep.start_freq
props["stopfreq"] = self._sweep.stop_freq
props["numpoints"] = self._sweep.num_points
props["mapping"] = self._sweep.mapping
return data
[docs] def plot(self):
"""
Plots power over frequency for last sweep
"""
import matplotlib.pyplot as plt
freq = self.get_offset_freq_vector()
freq_mhz = freq / 1e6
data = self.get_result()
power_dbm = utils.volt_rms_to_dbm(data["vector"])
phase = np.unwrap(np.angle(data["vector"]))
fig, axs = plt.subplots(2, sharex=True)
plt.xlabel("freq [MHz]")
axs[0].plot(freq_mhz, power_dbm)
axs[0].set(ylabel="power [dBm]")
axs[0].grid()
axs[1].plot(freq_mhz, phase)
axs[1].set(ylabel="phase [rad]")
axs[1].grid()
fig.suptitle(f"Sweep with center frequency {self._rf.center_freq / 1e9}GHz")
plt.show()
[docs] def set_to_device(self):
"""
Transfer settings to device
"""
# First, make sure that the configuration is still valid. This is needed
# since the users might change their instance of the dataclasses
self._check_config(self._sweep, self._avg, self._rf, self._trig, self._envelope)
# set configuration to device
self._configure_rf_frontends()
if self._is_externally_triggered:
self._configure_external_trigger()
self._configure_envelope()
self._configure_spectroscopy_delay()
self._configure_integration_time()
self._daq.sync()
[docs] def get_configuration(self):
"""
Returns:
the configuration of the sweeper class as
Config(SweepConfig, AvgConfig, RfConfig, TriggerConfig)
"""
return Config(self._sweep, self._avg, self._rf, self._trig)
[docs] def get_offset_freq_vector(self):
"""
Get vector of frequency points
"""
if self._sweep.mapping == _Mapping.LIN.value:
freq_vec = np.linspace(
self._sweep.start_freq, self._sweep.stop_freq, self._sweep.num_points
)
else: # log
start_f_log = np.log10(self._sweep.start_freq + self._rf.center_freq)
stop_f_log = np.log10(self._sweep.stop_freq + self._rf.center_freq)
temp_f_vec = np.logspace(start_f_log, stop_f_log, self._sweep.num_points)
freq_vec = temp_f_vec - self._rf.center_freq
return freq_vec
def _check_config(
self,
sweep_config=None,
avg_config=None,
rf_config=None,
trig_config=None,
envelope_config=None,
):
"""
Checks if the supplied configurations are valid
This function has the same arguments as the public function self.configure()
"""
if rf_config:
_check_channel_index(self._daq, self._dev, rf_config.channel)
_check_center_freq(rf_config.center_freq)
_check_input_range(rf_config.input_range)
_check_output_range(rf_config.output_range)
if sweep_config:
if self._sweep.use_sequencer and self._sweep.mapping != _Mapping.LIN.value:
raise ValueError(
"Only linear sweeps are supported with the sequencer-based approach"
)
_check_in_band_freq(sweep_config.start_freq, sweep_config.stop_freq)
_check_mapping(sweep_config.mapping)
_check_output_gain(sweep_config.oscillator_gain)
if avg_config:
_check_avg_mode(avg_config.mode)
self._check_integration_time(avg_config.integration_time)
self._check_integration_delay(avg_config.integration_delay)
if trig_config and trig_config.source is not None:
_check_trigger_source(trig_config.source)
if envelope_config:
_check_envelope_waveform(envelope_config.waveform)
self._check_envelope_delay(envelope_config.delay)
@property
def _path_prefix(self) -> str:
return f"/{self._dev}/qachannels/{self._rf.channel}/"
@property
def _acquired_path(self) -> str:
return self._path_prefix + "spectroscopy/result/acquired"
@property
def _spec_enable_path(self) -> str:
return self._path_prefix + "spectroscopy/result/enable"
@property
def _data_path(self) -> str:
return self._path_prefix + "spectroscopy/result/data/wave"
@property
def _is_externally_triggered(self) -> bool:
if self._trig.source is None:
return False
return (
_TriggerSource[self._trig.source.upper()].value
<= _EXTERNAL_TRIGGER_LIMIT.value
)
@property
def _is_sw_triggered(self) -> bool:
if self._trig.source is None:
return False
return (
_TriggerSource[self._trig.source.upper()].value
== _TriggerSource.SOFTWARE_TRIGGER0.value
)
def _configure_rf_frontends(self):
"""
Configures the RF frontend settings to the device
"""
# don't set output/input on/off, keep previous user settings
self._daq.setInt(self._path_prefix + "input/range", self._rf.input_range)
self._daq.setInt(self._path_prefix + "output/range", self._rf.output_range)
self._daq.setDouble(self._path_prefix + "centerfreq", self._rf.center_freq)
self._daq.setDouble(
self._path_prefix + "oscs/0/gain", self._sweep.oscillator_gain
)
self._daq.setString(self._path_prefix + "mode", "spectroscopy")
def _configure_external_trigger(self):
"""
Configures the external trigger inputs to the device
"""
# Note: the following index arithmetic is only valid for HW triggers:
trig_channel = _TriggerSource[self._trig.source.upper()].value // 2
trig_input = _TriggerSource[self._trig.source.upper()].value % 2
trig_path = f"/{self._dev}/qachannels/{trig_channel}/triggers/{trig_input}/"
self._daq.setDouble(trig_path + "level", self._trig.level)
self._daq.setInt(trig_path + "imp50", self._trig.imp50)
def _configure_envelope(self):
"""
Configures the envelope waveform settings for pulsed spectroscopy to the device
"""
path = self._path_prefix + "spectroscopy/envelope"
if self._envelope:
self._daq.setVector(
path + "/wave", self._envelope.waveform.astype("complex128")
)
self._daq.setInt(path + "/enable", 1)
self._daq.setDouble(path + "/delay", self._envelope.delay)
else:
self._daq.setInt(path + "/enable", 0)
def _configure_spectroscopy_delay(self):
"""
Configures the delay for triggering the spectroscopy module to the device
"""
path = self._path_prefix + "spectroscopy/delay"
if self._avg:
self._daq.setDouble(path, self._avg.integration_delay)
def _configure_integration_time(self):
"""
Configure the integration time to the device
"""
spectroscopy_len = round(self._avg.integration_time * self._shf_sample_rate)
self._daq.setInt(self._path_prefix + "spectroscopy/length", spectroscopy_len)
def _get_freq_vec_host(self):
"""
Get the vector of frequencies for the host-based sweep
"""
single_freq_vec = self.get_offset_freq_vector()
return self._concatenate_freq_vecs_host(single_freq_vec)
def _concatenate_freq_vecs_host(self, single_freq_vec):
"""
Concatenates the vector of frequencies depending on the averaging and triggering
type for the host-based sweep
"""
triggered_sequential = (
self._avg.mode.lower() == _AveragingMode.SEQUENTIAL.value
and not self._is_sw_triggered
)
if self._avg.num_averages == 1 or triggered_sequential:
freq_vec = single_freq_vec
elif self._avg.mode == _AveragingMode.CYCLIC.value:
num_concatenate = self._avg.num_averages - 1
freq_vec = single_freq_vec
while num_concatenate > 0:
num_concatenate -= 1
freq_vec = np.concatenate((freq_vec, single_freq_vec), axis=None)
else: # sequential + sw_trigger
freq_vec = np.zeros(self._avg.num_averages * self._sweep.num_points)
for i, f in enumerate(single_freq_vec):
for j in range(self._avg.num_averages):
ind = i * self._avg.num_averages + j
freq_vec[ind] = f
return freq_vec
def _configure_direct_triggering_host(self):
"""
Configures the direct triggering of the spectroscopy module in the host-based
approach
"""
if self._trig.source is None:
raise ValueError(
"Trigger source cannot be None if use_sequencer is set to False in "
"SweepConfig"
)
self._daq.setString(
self._path_prefix + "spectroscopy/trigger/channel",
self._trig.source.lower(),
)
def _configure_triggering_via_sequencer(self):
"""
Configures the triggering of the spectroscopy module via the sequencer
"""
if self._is_sw_triggered and (not self._trig.force_sw_trigger):
raise ValueError(
textwrap.dedent(
"""
Software trigger is not supported if use_sequencer is True!
We recommend to set the trigger source in TriggerConfig to None when
using the sequencer-based sweep, in order to let the sequencer
define the repetition rate of the experiment.
"""
)
)
# the sequencer receives the actual trigger
if self._trig.source is not None:
self._daq.setString(
self._path_prefix + "generator/auxtriggers/0/channel",
self._trig.source.lower(),
)
# the spectroscopy module must use the trigger coming from the sequencer
self._daq.setString(
self._path_prefix + "spectroscopy/trigger/channel",
f"chan{self._rf.channel}seqtrig0",
)
def _init_sweep(self):
"""
Initializes the sweep by configuring all settings to the devices
"""
self.set_to_device()
self._stop_result_logger()
if self._sweep.use_sequencer:
self._configure_triggering_via_sequencer()
sequencer_program = self._generate_sequencer_program()
self._load_sequencer_program(sequencer_program)
else:
self._configure_direct_triggering_host()
self._daq.sync()
def _stop_result_logger(self):
"""
Stops the result logger and makes sure it is stopped
"""
self._daq.setInt(self._spec_enable_path, 0)
self._daq.sync()
utils.wait_for_state_change(self._daq, self._spec_enable_path, 0)
def _issue_single_sw_trigger(self):
self._daq.syncSetInt(f"/{self._dev}/system/swtriggers/0/single", 1)
def _enable_measurement(self):
self._daq.syncSetInt(self._spec_enable_path, 1)
def _get_data_after_measurement(self):
data = self._get_result_logger_data()
return data["vector"]
def _set_freq_to_device(self, freq: float):
"""
Configures a frequency on the device
Arguments:
freq: the frequency to be configured
"""
self._daq.syncSetDouble(self._path_prefix + "oscs/0/freq", freq)
def _get_freq_sequencer(self, num_acquired: int) -> float:
"""
Infers the frequency from the number of acquired results in a sequencer-based
sweep
Arguments:
num_acquired: the current number of acquired results
Returns:
the inferred frequency
"""
if self._avg.mode == _AveragingMode.CYCLIC.value:
# Cyclic averaging
return self._sweep.start_freq + self._freq_step * (
num_acquired % self._sweep.num_points
)
# Sequential averaging
return self._sweep.start_freq + self._freq_step * (
num_acquired // self._avg.num_averages
)
def _poll_results(
self, data_path: str, acquired_path: str, expected_num_results: int
):
"""
Repetitively polls for results in sequencer-driven sweeps until the expected
number of results is acquired.
Raises a TimeoutError excpetion if no new result is acquired within 10 seconds.
Arguments:
data_path: path to the result data node
Must be subscribed by caller!
acquired_path: path to the "acquired" node, which reports the
current number of acquired results
Must be subscribed by caller!
expected_num_results: expected total number of results
Returns:
the result vector when it becomes available
"""
poll_time = 0.05
result_timeout = 10 # seconds
# avoid too many iterations but print often enough
print_interval = 0.5 # seconds
elapsed_time_since_result = 0
elapsed_time_since_print = print_interval # force print in first iteration
results = 0
result_logger_data = None
while elapsed_time_since_result < result_timeout:
poll_start = time.perf_counter()
poll_results = self._daq.poll(poll_time, timeout_ms=10, flat=True)
poll_duration = time.perf_counter() - poll_start
if acquired_path in poll_results:
results = poll_results[acquired_path]["value"][-1]
elapsed_time_since_result = 0
else:
elapsed_time_since_result += poll_duration
if data_path in poll_results:
result_logger_data = poll_results[data_path][0]["vector"]
if elapsed_time_since_print >= print_interval:
_print_sweep_progress(
results, expected_num_results, self._get_freq_sequencer(results)
)
elapsed_time_since_print = 0
else:
elapsed_time_since_print += poll_duration
is_done = (results == expected_num_results) and (
result_logger_data is not None
)
if is_done:
# report the final progress
_print_sweep_progress(
results,
expected_num_results,
self._get_freq_sequencer(results),
newline=True,
)
return result_logger_data
if results > 0:
raise TimeoutError(
f"failed to get a new result in {result_timeout} seconds, so far "
f"only got {results}!"
)
raise TimeoutError(f"failed to get any result in {result_timeout} seconds!")
def _wait_for_results_host(self, freq, num_results):
"""
Waits for the results in the host-based sweep
Arguments:
freq: the current frequency (only needed for the status printouts)
num_results: the desired number of results to wait for
"""
poll_time = 0.05
result_timeout = 10 # seconds
# avoid too many iterations but print often enough
print_interval = 0.5 # seconds
elapsed_time_since_result = 0
elapsed_time_since_print = print_interval # force print in first iteration
results = 0
while elapsed_time_since_result < result_timeout:
poll_start = time.perf_counter()
poll_results = self._daq.poll(poll_time, timeout_ms=10, flat=True)
poll_duration = time.perf_counter() - poll_start
if self._acquired_path in poll_results:
results = poll_results[self._acquired_path]["value"][-1]
elapsed_time_since_result = 0
else:
elapsed_time_since_result += poll_duration
if elapsed_time_since_print >= print_interval:
_print_sweep_progress(results, num_results, freq)
elapsed_time_since_print = 0
else:
elapsed_time_since_print += poll_duration
if results == num_results:
# we are done - but we must report the final progress
_print_sweep_progress(results, num_results, freq)
utils.wait_for_state_change(
self._daq, self._spec_enable_path, 0, timeout=1
)
return
if results > 0:
raise TimeoutError(
f"failed to get a new result in {result_timeout} seconds, so far "
f"only got {results}!"
)
raise TimeoutError(f"failed to get any result in {result_timeout} seconds!")
def _wait_for_results_host_sw_trig(self, expected_results, wait_time=1):
"""
Waits for the results in the host-based sweep using the software trigger
Arguments:
expected_results: the expected number of results
wait_time: the expected maximal time to wait for the results
"""
# leave margin for the swtrigger and the dataserver to be updated
wait_time = 1.2 * (wait_time + 0.3)
# iterate often (20ms) to improve performance
utils.wait_for_state_change(
self._daq,
self._acquired_path,
expected_results,
timeout=wait_time,
sleep_time=0.02,
)
def _run_freq_sweep(self):
"""
Runs the frequency sweep.
Dispatches between the different sweep approaches.
"""
if self._sweep.use_sequencer:
self._run_freq_sweep_sequencer()
elif self._is_sw_triggered:
self._run_freq_sweep_host_sw_trig()
else:
self._run_freq_sweep_host()
def _run_freq_sweep_sequencer(self):
"""
Runs the frequency sweep with the sequencer-based approach.
"""
self._print_sweep_details()
num_results = self._configure_result_length_and_averages_sequencer()
_subscribe_with_assert(self._daq, self._data_path)
_subscribe_with_assert(self._daq, self._acquired_path)
self._daq.sync()
self._enable_measurement()
self._enable_sequencer()
try:
self._result = self._poll_results(
self._data_path, self._acquired_path, num_results
)
finally:
self._daq.unsubscribe(self._data_path)
self._daq.unsubscribe(self._acquired_path)
def _run_freq_sweep_host_sw_trig(self):
"""
Runs the frequency sweep with the host-based approach using the software trigger
"""
self._print_sweep_details()
freq_vec = self._get_freq_vec_host()
self._configure_result_length_and_averages_host()
self._enable_measurement()
for i, freq in enumerate(freq_vec):
self._set_freq_to_device(freq)
_print_sweep_progress(i + 1, len(freq_vec), freq)
self._issue_single_sw_trigger()
self._wait_for_results_host_sw_trig(
expected_results=i + 1, wait_time=self._avg.integration_time
)
utils.wait_for_state_change(self._daq, self._spec_enable_path, 0, timeout=1.0)
self._result = self._get_data_after_measurement()
def _run_freq_sweep_host(self):
"""
Runs the frequency sweep with the host-based approach (not software-triggered)
"""
self._print_sweep_details()
freq_vec = self._get_freq_vec_host()
num_results = self._configure_result_length_and_averages_host()
self._result = []
_subscribe_with_assert(self._daq, self._acquired_path)
self._daq.sync()
for freq in freq_vec:
self._set_freq_to_device(freq)
self._enable_measurement()
try:
self._wait_for_results_host(freq, num_results)
except Exception as wait_exception:
# make sure we also unsubscribe from the node in case of an exception
self._daq.unsubscribe(self._acquired_path)
raise wait_exception
self._result = np.append(self._result, self._get_data_after_measurement())
# after the sweep has finished, we unsubscribe from the node
self._daq.unsubscribe(self._acquired_path)
def _get_playzero_hold_off_samples(self) -> int:
"""
Returns the hold-off time needed per iteration of the the inner-most
loop of the SeqC program. The return value respects the minimal hold-off time
and the granularity of the playZero SeqC command.
Returns:
the number of samples corresponding to the hold-off time
"""
hold_off_time_margin = 72e-9
hold_off_time = _round_for_playzero(
self._avg.integration_time + hold_off_time_margin, self._shf_sample_rate
)
# ensure minimum hold-off time for the integration
min_hold_off_samples = 2048
return max(round(hold_off_time * self._shf_sample_rate), min_hold_off_samples)
def _get_playzero_settling_samples(self) -> int:
"""
Returns an integer number of samples corresponding to the settling time
The return value respects the granularity of the playZero SeqC command.
Returns:
the number of samples corresponding to the settling time
"""
return round(
_round_for_playzero(self._sweep.settling_time, self._shf_sample_rate)
* self._shf_sample_rate
)
@property
def _freq_step(self) -> float:
"""
Returns the frequency step size according to the sweep settings
"""
return (self._sweep.stop_freq - self._sweep.start_freq) / self._sweep.num_points
def _generate_sequencer_program(self):
"""
Internal method, which generates the SeqC code for a sweep
"""
seqc_header = textwrap.dedent(
f"""
const OSC0 = 0;
setTrigger(0);
configFreqSweep(OSC0, {self._sweep.start_freq}, {self._freq_step});
"""
)
seqc_wait_for_trigger = (
"waitDigTrigger(1);"
if self._trig.source is not None
else "// self-triggering mode"
)
seqc_loop_body = textwrap.dedent(
f"""
{seqc_wait_for_trigger}
// define time from setting the oscillator frequency to sending
// the spectroscopy trigger
playZero({self._get_playzero_settling_samples()});
// set the oscillator frequency depending on the loop variable i
setSweepStep(OSC0, i);
resetOscPhase();
// define time to the next iteration
playZero({self._get_playzero_hold_off_samples()});
// trigger the integration unit and pulsed playback in pulsed mode
setTrigger(1);
setTrigger(0);
"""
)
averaging_loop_arguments = f"var j = 0; j < {self._avg.num_averages}; j++"
sweep_loop_arguments = f"var i = 0; i < {self._sweep.num_points}; i++"
if self._avg.mode == _AveragingMode.CYCLIC.value:
outer_loop_arguments = averaging_loop_arguments
inner_loop_arguments = sweep_loop_arguments
else:
outer_loop_arguments = sweep_loop_arguments
inner_loop_arguments = averaging_loop_arguments
seqc = (
seqc_header
+ textwrap.dedent(
f"""
for({outer_loop_arguments}) {{
for({inner_loop_arguments}) {{"""
)
+ textwrap.indent(seqc_loop_body, " " * 8)
+ textwrap.dedent(
"""
}
}
"""
)
)
return seqc
def _load_sequencer_program(self, sequencer_program: str, timeout: float = 10):
"""
Compiles and loads a sequencer program for the fast sweep"
Arguments:
sequencer_program: the sequencer program to be compiled and loaded
timeout: the maximum time to wait during compilation in seconds
"""
# first, reset the sequencer
self._daq.syncSetInt(self._path_prefix + "generator/reset", 1)
device_type = self._daq.getString(f"/{self._dev}/features/devtype")
device_options = self._daq.getString(f"/{self._dev}/features/options")
elf, _ = compile_seqc(
sequencer_program,
device_type,
device_options,
self._rf.channel,
sequencer="qa",
)
self._daq.setVector(
f"/{self._dev}/qachannels/{self._rf.channel}/generator/elf/data", elf
)
# wait until the device becomes ready after program upload
utils.wait_for_state_change(
self._daq, self._path_prefix + "generator/ready", 1, timeout=1.0
)
time.sleep(0.1)
def _enable_sequencer(self):
"""
Starts the sequencer for the sequencer-based sweep
"""
self._daq.setInt(self._path_prefix + "generator/single", 1)
self._daq.syncSetInt(self._path_prefix + "generator/enable", 1)
hundred_milliseconds = 0.1
time.sleep(hundred_milliseconds)
def _print_sweep_details(self):
detail_str = (
f"Run a sweep with {self._sweep.num_points} frequency points in the range of "
f"[{self._sweep.start_freq / 1e6}, {self._sweep.stop_freq / 1e6}] MHz + "
f"{self._rf.center_freq / 1e9} GHz. \n"
f"Mapping is {self._sweep.mapping}. \n"
f"Integration time = {self._avg.integration_time} sec. \n"
f"Measures {self._avg.num_averages} times per frequency point. \n"
f"Averaging mode is {self._avg.mode}.\n"
)
if self._trig.source is not None:
detail_str += f"Trigger source is {self._trig.source.lower()}."
else:
detail_str += str(
"Trigger source is set to None, which means the sequencer "
"defines the repetition rate."
)
print(detail_str)
def _configure_result_length_and_averages_host(self) -> int:
"""
Configures the result vector length and number of averages for the host-based
sweep to the device
Returns:
the configured number of results
"""
if self._is_sw_triggered:
num_results = self._sweep.num_points * self._avg.num_averages
elif self._avg.mode.lower() == _AveragingMode.SEQUENTIAL.value:
num_results = self._avg.num_averages
else:
num_results = 1
self._daq.setInt(self._path_prefix + "spectroscopy/result/length", num_results)
# for the host-based approach, we always average in software, thus set the
# hardware averages to 1
self._daq.setInt(self._path_prefix + "spectroscopy/result/averages", 1)
return num_results
def _configure_result_length_and_averages_sequencer(self) -> int:
"""
Configures the result vector length and number of averages for the
sequencer-based sweep to the device
Returns:
the expected total number of results, which is the product of the result
vector length and number of averages
"""
self._daq.setString(
self._path_prefix + "spectroscopy/result/mode", self._avg.mode
)
self._daq.setInt(
self._path_prefix + "spectroscopy/result/length", self._sweep.num_points
)
self._daq.setInt(
self._path_prefix + "spectroscopy/result/averages", self._avg.num_averages
)
return self._sweep.num_points * self._avg.num_averages
def _get_result_logger_data(self):
result_path = self._path_prefix + "spectroscopy/result/data/wave"
data = self._daq.get(result_path, flat=True)
return data[result_path.lower()][0]
def _average_samples(self, vec):
if self._avg.num_averages == 1:
return vec
avg_vec = np.zeros(self._sweep.num_points, dtype="complex")
if self._avg.mode == _AveragingMode.CYCLIC.value:
total_measurements = self._sweep.num_points * self._avg.num_averages
for i in range(self._sweep.num_points):
avg_range = range(i, total_measurements, self._sweep.num_points)
avg_vec[i] = np.mean(vec[avg_range])
else: # sequential
for i in range(self._sweep.num_points):
start_ind = i * self._avg.num_averages
avg_range = range(start_ind, start_ind + self._avg.num_averages)
avg_vec[i] = np.mean(vec[avg_range])
return avg_vec
def _check_integration_time(self, integration_time_s):
max_int_len = ((2**23) - 1) * 4
min_int_len = 4
max_integration_time = max_int_len / self._shf_sample_rate
min_integration_time = min_int_len / self._shf_sample_rate
if integration_time_s < min_integration_time:
raise ValueError(
f"Integration time below minimum of {min_integration_time}s."
)
if integration_time_s > max_integration_time:
raise ValueError(
f"Integration time exceeds maximum of {max_integration_time}s."
)
def _check_delay(self, resolution_ns, min_s, max_s, val_s):
if val_s > max_s or val_s < min_s:
raise ValueError(f"Delay out of bounds! {min_s} <= delay <= {max_s}")
if (val_s * 1e9) % resolution_ns != 0:
raise ValueError(f"Delay not in multiples of {resolution_ns} ns.")
def _check_integration_delay(self, integration_delay_s):
resolution_ns = 2
max_s = 131e-6
self._check_delay(resolution_ns, 0, max_s, integration_delay_s)
def _check_envelope_delay(self, delay_s):
resolution_ns = 2
max_s = 131e-6
self._check_delay(resolution_ns, 0, max_s, delay_s)