Sweeping parameters with QCoDeS in LabOne Q¶
This notebook shows you how to perform a very general 2D sweep. Here, the two sweep axes are set through a QCoDeS parameter, mimicking arbitrary instruments that can be controlled with a QCoDeS driver.
0. General Imports¶
In [1]:
Copied!
import time
import matplotlib.pyplot as plt
import numpy as np
from laboneq.simple import *
import time
import matplotlib.pyplot as plt
import numpy as np
from laboneq.simple import *
In [2]:
Copied!
# Mocking Qcodes DummyInstrument
class MockParam:
def __init__(self, name):
self.name = name
self.value = None
def set(self,value):
"""Set value."""
self.value = value
def get(self):
"""Get value."""
return self.value
class MockDummyInstrument:
def __init__(self, name: str, gates: list[str]):
self.name = name
self.gates =gates
self._params = {}
for g in self.gates:
self._params[g] = MockParam(g)
def __getitem__(self, key):
return self._params[key]
def __getattr__(self, name):
if name in self._params:
return self._params[name]
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
# Mocking Qcodes DummyInstrument
class MockParam:
def __init__(self, name):
self.name = name
self.value = None
def set(self,value):
"""Set value."""
self.value = value
def get(self):
"""Get value."""
return self.value
class MockDummyInstrument:
def __init__(self, name: str, gates: list[str]):
self.name = name
self.gates =gates
self._params = {}
for g in self.gates:
self._params[g] = MockParam(g)
def __getitem__(self, key):
return self._params[key]
def __getattr__(self, name):
if name in self._params:
return self._params[name]
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
In [3]:
Copied!
try:
from qcodes.instrument_drivers.mock_instruments import DummyInstrument
except ImportError:
print("Qcodes not found, using MockDummyInstrument instead. Users are advised to install Qcodes bot more accurate results.")
DummyInstrument = MockDummyInstrument
try:
from qcodes.instrument_drivers.mock_instruments import DummyInstrument
except ImportError:
print("Qcodes not found, using MockDummyInstrument instead. Users are advised to install Qcodes bot more accurate results.")
DummyInstrument = MockDummyInstrument
In [4]:
Copied!
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_local_osc = DummyInstrument(name="RF_source", gates=["P", "f"])
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_local_osc = DummyInstrument(name="RF_source", gates=["P", "f"])
1. Device Setup¶
1.1 Create device setup¶
In [5]:
Copied!
descriptor = """\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
descriptor = """\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
2. MFLI example¶
2.1 Connect session¶
In [6]:
Copied!
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
[2025.01.21 09:43:42.427] INFO Logging initialized from [Default inline config in laboneq.laboneq_logging] logdir is /builds/qccs/laboneq-applications/docs/sources/how-to-guides/sources/02_spin_qubits/laboneq_output/log
[2025.01.21 09:43:42.429] INFO VERSION: laboneq 2.44.0
[2025.01.21 09:43:42.430] INFO Connecting to data server at your_ip_address:8004
[2025.01.21 09:43:42.432] INFO Connected to Zurich Instruments LabOne Data Server version 24.10 at your_ip_address:8004
[2025.01.21 09:43:42.434] INFO Configuring the device setup
[2025.01.21 09:43:42.435] INFO The device setup is configured
Out[6]:
<laboneq.dsl.session.ConnectionState at 0x7ea026bad880>
Connect to the instrument in the session
In [7]:
Copied!
mfli = session.devices["device_mfli"]
mfli = session.devices["device_mfli"]
2.2 Experiment Definition¶
In [8]:
Copied!
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use near-time callback
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use near-time callback
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
with exp.acquire_loop_rt(uid="RT_shots", count=1):
pass
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use near-time callback
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use near-time callback
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
with exp.acquire_loop_rt(uid="RT_shots", count=1):
pass
2.3 Configure MFLI and DAQ module¶
In [9]:
Copied!
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the horizontal axis of the matrix (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrelevant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the horizontal axis of the matrix (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrelevant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
2.4 Define near-time callbacks for arming MFLI and reading results¶
In [10]:
Copied!
def read_mfli(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() is True:
daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
def read_mfli(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() is True:
daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
In [11]:
Copied!
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_local_osc.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_local_osc.f.get() # return new value
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_local_osc.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_local_osc.f.get() # return new value
In [12]:
Copied!
# register near-time callbacks
session.register_neartime_callback(set_magnet, "set_magnet")
session.register_neartime_callback(set_frequency, "set_frequency")
session.register_neartime_callback(read_mfli, "readMFLI")
# register near-time callbacks
session.register_neartime_callback(set_magnet, "set_magnet")
session.register_neartime_callback(set_frequency, "set_frequency")
session.register_neartime_callback(read_mfli, "readMFLI")
2.5 Run experiment¶
In [13]:
Copied!
my_results = session.run(exp)
my_results = session.run(exp)
[2025.01.21 09:43:42.486] INFO Starting LabOne Q Compiler run...
[2025.01.21 09:43:42.488] INFO Schedule completed. [0.000 s]
[2025.01.21 09:43:42.489] INFO Code generation completed for all AWGs. [0.000 s]
[2025.01.21 09:43:42.489] INFO Completed compilation step 1 of 45. [0.002 s]
[2025.01.21 09:43:42.490] INFO Skipping compilation for next step(s)...
[2025.01.21 09:43:42.491] INFO Finished LabOne Q Compiler run.
[2025.01.21 09:43:42.494] INFO Starting near-time execution...
Set magnet to new value:-400.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:-300.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:-200.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:-100.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:0.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:100.0 Set new frequency:0.0
Set new frequency:100.0 Set new frequency:200.0
Set new frequency:300.0 Set new frequency:400.0
Set magnet to new value:200.0 Set new frequency:0.0 Set new frequency:100.0
Set new frequency:200.0 Set new frequency:300.0
Set new frequency:400.0 Set magnet to new value:300.0
Set new frequency:0.0 Set new frequency:100.0
Set new frequency:200.0 Set new frequency:300.0
Set new frequency:400.0 Set magnet to new value:400.0
Set new frequency:0.0 Set new frequency:100.0
Set new frequency:200.0 Set new frequency:300.0
Set new frequency:400.0 [2025.01.21 09:43:47.952] INFO Finished near-time execution.
3. Plot results¶
In [14]:
Copied!
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
values.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].value[0]
)
times.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].time[0]
)
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
values.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].value[0]
)
times.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].time[0]
)
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
Emulation - nothing to plot
3.1 Plot individual time traces¶
In [15]:
Copied!
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
results = my_results.neartime_callback_results["readMFLI"][idx][node][0]
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
results = my_results.neartime_callback_results["readMFLI"][idx][node][0]
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")
Emulation - nothing to plot