User Functions in Near-Time Loops¶
In near-time loops of experiments, any Python function can be executed. These functions are dubbed callback or user functions.
They can be used to:
- control non-QCCS instruments, such as Zurich Instruments lock-in amplifiers or third-party instruments, and acquire results from these instruments
- access and process the results the experiment has yielded so far
- exchange waveforms between real-time loops (e.g., based on measurement results)
0. General Imports and Definitions¶
0.1 Python Imports¶
# LabOne Q:
from laboneq.simple import *
# Helpers:
from laboneq.contrib.example_helpers.plotting.plot_helpers import plot_simulation
import numpy as np
import matplotlib.pyplot as plt
1. Define Device Setup and Calibration¶
1.1 Define a Device Setup¶
The descriptor contains all information on instruments used, internal connections between instruments as well as wiring to the experiment
descriptor = """\
instruments:
HDAWG:
- address: DEV8001
uid: device_hdawg
UHFQA:
- address: DEV2001
uid: device_uhfqa
PQSC:
- address: DEV10001
uid: device_pqsc
connections:
device_hdawg:
- iq_signal: q0/drive_line
ports: [SIGOUTS/0, SIGOUTS/1]
- iq_signal: q1/drive_line
ports: [SIGOUTS/2, SIGOUTS/3]
- rf_signal: q0/flux_line
ports: [SIGOUTS/4]
- rf_signal: q1/flux_line
ports: [SIGOUTS/5]
- to: device_uhfqa
port: DIOS/0
device_uhfqa:
- iq_signal: q0/measure_line
ports: [SIGOUTS/0, SIGOUTS/1]
- acquire_signal: q0/acquire_line
- iq_signal: q1/measure_line
ports: [SIGOUTS/0, SIGOUTS/1]
- acquire_signal: q1/acquire_line
device_pqsc:
- to: device_hdawg
port: ZSYNCS/0
"""
1.2 Define Calibration Settings¶
Modify the calibration on the device setup with known parameters for qubit control and readout - qubit control and readout frequencies, mixer calibration corrections
# functions that modifies the calibration on a given device setup
def calibrate_devices(device_setup):
## qubit 0
# calibration setting for drive line for qubit 0
device_setup.logical_signal_groups["q0"].logical_signals[
"drive_line"
].calibration = SignalCalibration(
# oscillator settings - frequency and type of oscillator used to modulate the pulses applied through this signal line
oscillator=Oscillator(
uid="drive_q0_osc", frequency=1e8, modulation_type=ModulationType.HARDWARE
),
# mixer calibration settings to compensate for non-ideal mixer configuration
mixer_calibration=MixerCalibration(
voltage_offsets=[0.0, 0.0],
correction_matrix=[
[1.0, 0.0],
[0.0, 1.0],
],
),
)
# calibration setting for flux line for qubit 0
device_setup.logical_signal_groups["q0"].logical_signals[
"flux_line"
].calibration = SignalCalibration(
oscillator=Oscillator(
uid="flux_q0_osc", frequency=1e8, modulation_type=ModulationType.HARDWARE
),
)
# calibration setting for readout pulse line for qubit 0
device_setup.logical_signal_groups["q0"].logical_signals[
"measure_line"
].calibration = SignalCalibration(
oscillator=Oscillator(
uid="measure_q0_osc", frequency=1e8, modulation_type=ModulationType.SOFTWARE
)
)
# calibration setting for data acquisition line for qubit 0
device_setup.logical_signal_groups["q0"].logical_signals[
"acquire_line"
].calibration = SignalCalibration(
oscillator=Oscillator(
uid="acquire_osc", frequency=1e8, modulation_type=ModulationType.SOFTWARE
),
)
1.3 Create Device Setup and Apply Calibration Settings¶
# Emulation mode does not create connection to the devices
do_emulation = True
# Function returning a calibrated device setup
def create_device_setup():
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="111.22.33.44", # ip address of the LabOne dataserver used to communicate with the instruments
server_port="8004", # port number of the dataserver - default is 8004
setup_name="ZI_QCCS", # setup name
)
calibrate_devices(device_setup)
return device_setup
# create device setup
device_setup = create_device_setup()
2. User functions¶
First, the user functions are defined. User functions can contain name arguments and return values. Then, the user functions are used in a near-time sweep in an example experiment.
2.1 Define pulses¶
# qubit drive pulse
x90 = pulse_library.gaussian(uid="x90", length=100e-9, amplitude=1.0)
x90_rect = pulse_library.const(uid="x90_rect", length=100e-9, amplitude=1.0)
# readout drive pulse
readout_pulse = pulse_library.const(uid="readout_pulse", length=400e-9, amplitude=1.0)
# readout weights for integration
readout_weighting_function = pulse_library.const(
uid="readout_weighting_function", length=200e-9, amplitude=1.0
)
2.2 Define user functions¶
User functions are normal Python functions, but their first argument must be the session
object. This enables access to all QCCS instruments and the results that have already been collected. The return values will be stored in the session.results
object.
# additional import for the purpose of demonstration
import logging
mylogger = logging.getLogger("user_func")
# A user function is a regular python function, taking named arguments
# The function may return values, which will be accessible after execution
# The first argument must be the LabOne Q SW session
def user_func(session, frequency, amplitude):
mylogger.info(
f"Called 'user_func' with params: frequency={frequency}, amplitude={amplitude:.1f}"
)
return f"frequency={frequency}, amplitude={amplitude:.1f}"
# Any return type is allowed, values will be put into the results list as is
def my_power_func(session, amplitude, gain):
mylogger.info(
f"Called 'my_power_func' with params: amplitude={amplitude:.1f}, gain={gain}"
)
return amplitude, (amplitude * gain) ** 2
2.2.1 Controlling individual devices¶
Configured devices can be controlled via zhinst-toolkit
API.
def query_hdawg_device_info(session):
"""This function queries information from the configured HDAWG device and can be called during the experiment.
Requires device connection: emulation must be set to False."""
if not do_emulation:
device_hdawg = session.devices["device_hdawg"]
amplitudes = device_hdawg.awgs["*"].outputs["*"].amplitude()
gains = device_hdawg.awgs["*"].outputs["*"].gains["*"]()
awg_osc_freq = device_hdawg.oscs["*"].freqawg()
return amplitudes, gains, awg_osc_freq
User functions can be added before or after the near-time sweep acquisition. The order of their execution follows this position:
inner_user_func
fills the inner_results
list in the inner loop, after_inner_user_func
consumes and clears it afterwards.
inner_results = []
multiplier = [1]
def inner_user_func(session, param):
inner_results.append(param * multiplier[0])
mylogger.info(f"Called 'inner_user_func' with param={param}")
def after_inner_user_func(session):
mylogger.info(
f"Called 'after_inner_user_func', collected inner values: {inner_results}"
)
res = inner_results.copy()
inner_results.clear()
multiplier[0] = multiplier[0] * 2
return res
User functions can access the results the experiment has already acquired:
def process_partial_result(session: Session):
# get a reference to the results
res = session.results
# the first dimension of results corresponds to the near-time sweep
# the second dimension corresponds to the real-time sweep
if res is not None: # check that first results already in
# index of last acquired data point in near-time sweep
ind = res.get_last_nt_step("ac_0")
m = np.mean(res.get_data("ac_0")[ind]) # average latest result sweep
if np.abs(m) > 0.5:
session.replace_pulse("x90", x90_rect)
else:
session.replace_pulse("x90", x90)
2.3 Experiment definition¶
## define calibration settings
lsg = device_setup.logical_signal_groups["q0"].logical_signals
# Apply as baseline calibration
lsg["drive_line"].calibration.oscillator.frequency = 100e6
lsg["measure_line"].calibration.oscillator.frequency = 100e6
lsg["drive_line"].calibration.amplitude = 0.5
lsg["acquire_line"].calibration.port_delay = 100e-9
Define sweep parameters for the outer near-time and the inner real-time sweep
outer_sweep_parameter = LinearSweepParameter(uid="amp", start=0.1, stop=1.0, count=10)
inner_arbitrary_sweep = SweepParameter(uid="inner", values=[1, 1.1, 3.5, 7])
# define number of averages
average_exponent = 1 # used for 2^n averages, n=average_exponent, maximum: n = 17
# Create Experiment - no explicit mapping to qubit lines
exp = Experiment(
uid="User Function",
signals=[
ExperimentSignal("q0_drive"),
ExperimentSignal("q0_measure"),
ExperimentSignal("q0_acquire"),
],
)
## experimental pulse sequence
# outer loop - near-time sweep
with exp.sweep(uid="sweep", parameter=outer_sweep_parameter):
# Call user functions.
# Either constant float values or parameters of the containing
# loop can be used as arguments. Only named arguments supported,
# arguments to `exp.call` must match those of the user function
# being called.
# Variant 1: Use python function name as reference
exp.call(user_func, frequency=500e6, amplitude=outer_sweep_parameter)
# Variant 2: Use custom name as reference, see section 2.3 below
exp.call("calc_power", amplitude=outer_sweep_parameter, gain=1.0)
# Calling same function multiple times allowed, results will be
# appended to the same result list in order of execution
exp.call("calc_power", amplitude=outer_sweep_parameter, gain=2.0)
# Same python function may be registered with different reference names,
# in which case it is treated as a separate function, producing its own
# result list, see sections "2.4 Results" below
exp.call("calc_power_alt", amplitude=outer_sweep_parameter, gain=4.0)
# process partial results and feed forward to next real-time execution
exp.call("process_partial_result")
exp.call(query_hdawg_device_info)
# inner loop - near-time sweep
with exp.sweep(uid="inner_sweep", parameter=inner_arbitrary_sweep):
# Variant 2: Use custom name as reference
exp.call("inner_user_func", param=inner_arbitrary_sweep)
# innermost loop - real-time pulse sequence with averaging
with exp.acquire_loop_rt(
uid="shots",
count=pow(2, average_exponent),
acquisition_type=AcquisitionType.INTEGRATION,
):
# qubit excitation pulse
with exp.section(uid="qubit_excitation"):
exp.play(signal="q0_drive", pulse=x90)
# readout and data acquisition
with exp.section(uid="qubit_readout"):
exp.reserve(signal="q0_drive")
# add a delay before the readout pulse
exp.delay(signal="q0_measure", time=10e-9)
exp.delay(signal="q0_acquire", time=10e-9)
# play readout pulse
exp.play(signal="q0_measure", pulse=readout_pulse)
# add an offset between the readout pulse and the start of the data acquisition
exp.delay(signal="q0_acquire", time=100e-9)
# signal data acquisition
exp.acquire(
signal="q0_acquire",
handle="ac_0",
kernel=readout_weighting_function,
)
# relax time after readout - for signal processing and qubit relaxation to groundstate
with exp.section(uid="relax"):
exp.delay(signal="q0_measure", time=1e-6)
# The call order of user functions is preserved relative to the nested sections
exp.call("after_inner_user_func")
# define signal map
map_q0 = {
"q0_drive": device_setup.logical_signal_groups["q0"].logical_signals["drive_line"],
"q0_measure": device_setup.logical_signal_groups["q0"].logical_signals[
"measure_line"
],
"q0_acquire": device_setup.logical_signal_groups["q0"].logical_signals[
"acquire_line"
],
}
# set signal map
exp.set_signal_map(map_q0)
2.3 Run the Experiment¶
# create a session
session = Session(device_setup)
All user functions referred to from the experiment must be registered with the session.
# Variant 1: Use python function name as reference
session.register_user_function(user_func)
session.register_user_function(query_hdawg_device_info)
# Variant 2: Give the name explicitly
session.register_user_function(my_power_func, "calc_power")
# Same python function may be registered multiple times with different names
session.register_user_function(my_power_func, "calc_power_alt")
session.register_user_function(inner_user_func, "inner_user_func")
session.register_user_function(after_inner_user_func, "after_inner_user_func")
session.register_user_function(process_partial_result)
Continue with the standard routine to run an experiment in a session.
# connect to session
session.connect(do_emulation=do_emulation)
# run experiment
my_results = session.run(exp)
# Plot simulated output signals
plot_simulation(session.compiled_experiment, 0, 3e-6)
2.4 Results¶
Investigate the results returned from the calls of user functions
# Return values of user functions upon execution are available per function, use function name as a key.
my_results.user_func_results["user_func"]
# Two calls per iteration to `calc_power` result in two adjacent entries in the results
my_results.user_func_results["calc_power"]
plt.scatter(*zip(*my_results.user_func_results["calc_power"]))
my_results.user_func_results["calc_power_alt"]
plt.scatter(*zip(*my_results.user_func_results["calc_power_alt"]))
my_results.user_func_results["after_inner_user_func"]
my_results.user_func_results["query_hdawg_device_info"]
2.5 Aborting experiment execution¶
Call session.abort_execution()
in a user function to gracefully terminate the execution of the experiment.
# Create Experiment - no explicit mapping to qubit lines
exp = Experiment(
uid="User Function",
signals=[ExperimentSignal("q0_drive")],
)
exp.set_signal_map(
{"q0_drive": device_setup.logical_signal_groups["q0"].logical_signals["drive_line"]}
)
sweep_parameter = LinearSweepParameter(start=0, stop=10, count=11)
def user_function_abort_experiment(session, foo):
print(f"In user function, foo={foo}")
if foo >= 5: # Abort execution after 5 steps
print("Aborting execution")
session.abort_execution()
# Calling `session.abort_execution()` will not return. The code below is not executed for `foo == 5`.
print("Continuing execution")
session.register_user_function(user_function_abort_experiment)
## experimental pulse sequence
# outer loop - near-time sweep
with exp.sweep(uid="sweep", parameter=sweep_parameter):
# Call user function
exp.call(user_function_abort_experiment, foo=sweep_parameter)
with exp.acquire_loop_rt(
uid="shots",
count=pow(2, average_exponent),
acquisition_type=AcquisitionType.INTEGRATION,
):
# dummy pulse playback
with exp.section(uid="qubit_excitation"):
exp.play(signal="q0_drive", pulse=x90)
session.run(exp);