Sweeping parameters with QCoDeS in LabOne Q¶
This notebook shows you how to perform a very general 2D sweep. Here, the two sweep axes are set through a QCoDeS parameter, mimicking arbitrary instruments that can be controlled with a QCoDeS driver.
0. General Imports¶
In [1]:
Copied!
import time
import matplotlib.pyplot as plt
import numpy as np
from laboneq.simple import *
import time
import matplotlib.pyplot as plt
import numpy as np
from laboneq.simple import *
In [2]:
Copied!
from qcodes.tests.instrument_mocks import DummyInstrument
from qcodes.tests.instrument_mocks import DummyInstrument
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[2], line 1 ----> 1 from qcodes.tests.instrument_mocks import DummyInstrument ModuleNotFoundError: No module named 'qcodes.tests'
In [3]:
Copied!
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_local_osc = DummyInstrument(name="RF_source", gates=["P", "f"])
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_local_osc = DummyInstrument(name="RF_source", gates=["P", "f"])
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[3], line 2 1 # generate dummy instruments ----> 2 my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"]) 3 my_local_osc = DummyInstrument(name="RF_source", gates=["P", "f"]) NameError: name 'DummyInstrument' is not defined
1. Device Setup¶
1.1 Create device setup¶
In [4]:
Copied!
descriptor = """\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
descriptor = """\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
2. MFLI example¶
2.1 Connect session¶
In [5]:
Copied!
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
[2024.11.07 16:39:16.142] INFO Logging initialized from [Default inline config in laboneq.laboneq_logging] logdir is /builds/qccs/laboneq-applications/docs/sources/how-to-guides/sources/02_spin_qubits/laboneq_output/log
[2024.11.07 16:39:16.143] INFO VERSION: laboneq 2.41.0
[2024.11.07 16:39:16.144] INFO Connecting to data server at your_ip_address:8004
[2024.11.07 16:39:16.146] INFO Connected to Zurich Instruments LabOne Data Server version 24.10 at your_ip_address:8004
[2024.11.07 16:39:16.146] INFO Configuring the device setup
[2024.11.07 16:39:16.148] INFO The device setup is configured
Out[5]:
<laboneq.dsl.session.ConnectionState at 0x72315a776b10>
Connect to the instrument in the session
In [6]:
Copied!
mfli = session.devices["device_mfli"]
mfli = session.devices["device_mfli"]
2.2 Experiment Definition¶
In [7]:
Copied!
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use near-time callback
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use near-time callback
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
with exp.acquire_loop_rt(uid="RT_shots", count=1):
pass
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use near-time callback
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use near-time callback
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
with exp.acquire_loop_rt(uid="RT_shots", count=1):
pass
2.3 Configure MFLI and DAQ module¶
In [8]:
Copied!
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the horizontal axis of the matrix (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrelevant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the horizontal axis of the matrix (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrelevant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
2.4 Define near-time callbacks for arming MFLI and reading results¶
In [9]:
Copied!
def read_mfli(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() is True:
daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
def read_mfli(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() is True:
daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
In [10]:
Copied!
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_local_osc.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_local_osc.f.get() # return new value
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_local_osc.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_local_osc.f.get() # return new value
In [11]:
Copied!
# register near-time callbacks
session.register_neartime_callback(set_magnet, "set_magnet")
session.register_neartime_callback(set_frequency, "set_frequency")
session.register_neartime_callback(read_mfli, "readMFLI")
# register near-time callbacks
session.register_neartime_callback(set_magnet, "set_magnet")
session.register_neartime_callback(set_frequency, "set_frequency")
session.register_neartime_callback(read_mfli, "readMFLI")
2.5 Run experiment¶
In [12]:
Copied!
my_results = session.run(exp)
my_results = session.run(exp)
[2024.11.07 16:39:16.186] INFO Starting LabOne Q Compiler run...
[2024.11.07 16:39:16.188] INFO Schedule completed. [0.000 s]
[2024.11.07 16:39:16.189] INFO Code generation completed for all AWGs. [0.000 s]
[2024.11.07 16:39:16.189] INFO Completed compilation step 1 of 45. [0.001 s]
[2024.11.07 16:39:16.190] INFO Skipping compilation for next step(s)...
[2024.11.07 16:39:16.192] INFO Finished LabOne Q Compiler run.
[2024.11.07 16:39:16.194] INFO Starting near-time execution...
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[12], line 1 ----> 1 my_results = session.run(exp) File /usr/local/lib/python3.12/site-packages/laboneq/dsl/session.py:436, in Session.run(self, experiment) 427 self._last_results = Results( 428 experiment=self.experiment, 429 device_setup=self.device_setup, (...) 433 execution_errors=[], 434 ) 435 try: --> 436 controller.execute_compiled( 437 self.compiled_experiment.scheduled_experiment, ProtectedSession(self) 438 ) 439 finally: 440 results = controller.results() File /usr/local/lib/python3.12/site-packages/laboneq/controller/controller.py:513, in Controller.execute_compiled(self, scheduled_experiment, protected_session) 508 def execute_compiled( 509 self, 510 scheduled_experiment: ScheduledExperiment, 511 protected_session: ProtectedSession | None = None, 512 ): --> 513 run_async(self._execute_compiled_async, scheduled_experiment, protected_session) File /usr/local/lib/python3.12/site-packages/laboneq/core/utilities/async_helpers.py:33, in run_async(func, *args, **kwargs) 27 """Run callable asynchronous object synchronously. 28 29 Args: 30 func: Asynchronous callable to be called with `*args` and `*kwargs` 31 """ 32 # if _is_event_loop_running(): ---> 33 return unsync(func)(*args, **kwargs).result() File /usr/local/lib/python3.12/site-packages/unsync/unsync.py:144, in Unfuture.result(self, *args, **kwargs) 142 raise asyncio.InvalidStateError("Calling result() in an unsync method is not allowed") 143 # Wait on the concurrent Future outside unsync.thread --> 144 return self.concurrent_future.result(*args, **kwargs) File /usr/local/lib/python3.12/concurrent/futures/_base.py:456, in Future.result(self, timeout) 454 raise CancelledError() 455 elif self._state == FINISHED: --> 456 return self.__get_result() 457 else: 458 raise TimeoutError() File /usr/local/lib/python3.12/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File /usr/local/lib/python3.12/site-packages/laboneq/controller/controller.py:521, in Controller._execute_compiled_async(self, scheduled_experiment, protected_session) 515 async def _execute_compiled_async( 516 self, 517 scheduled_experiment: ScheduledExperiment, 518 protected_session: ProtectedSession | None = None, 519 ): 520 self._recipe_data = pre_process_compiled(scheduled_experiment, self._devices) --> 521 await self._execute_compiled_impl( 522 protected_session=protected_session or ProtectedSession(None) 523 ) File /usr/local/lib/python3.12/site-packages/laboneq/controller/controller.py:541, in Controller._execute_compiled_impl(self, protected_session) 539 try: 540 with tracing.get_tracer().start_span("near-time-execution"): --> 541 await NearTimeRunner( 542 controller=self, 543 protected_session=protected_session, 544 ).run(self._recipe_data.execution) 545 except AbortExecution: 546 # eat the exception 547 pass File /usr/local/lib/python3.12/site-packages/laboneq/executor/executor.py:543, in AsyncExecutorBase.run(self, root_sequence) 541 scope = ExecutionScope(None, self) 542 for notification in root_sequence.run(scope): --> 543 await self._handlers_map[notification.statement_type](**notification.args) File /usr/local/lib/python3.12/site-packages/laboneq/controller/near_time_runner.py:61, in NearTimeRunner.nt_callback_handler(self, func_name, args) 59 res = await func(self.protected_session, **args) 60 else: ---> 61 res = func(self.protected_session, **args) 62 except AbortExecution: 63 _logger.warning(f"Execution aborted by near-time callback '{func_name}'") Cell In[10], line 2, in set_magnet(session, value) 1 def set_magnet(session, value): ----> 2 my_magnet.Bx.set(value) # set new value in mT 3 print(f"Set magnet to new value:{value}") 4 time.sleep(0.1) # settling time NameError: name 'my_magnet' is not defined
3. Plot results¶
In [13]:
Copied!
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
values.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].value[0]
)
times.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].time[0]
)
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
values.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].value[0]
)
times.append(
my_results.neartime_callback_results["readMFLI"][idx][node][0].time[0]
)
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
Emulation - nothing to plot
3.1 Plot individual time traces¶
In [14]:
Copied!
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
results = my_results.neartime_callback_results["readMFLI"][idx][node][0]
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.neartime_callback_results["readMFLI"].__len__()):
results = my_results.neartime_callback_results["readMFLI"][idx][node][0]
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")
Emulation - nothing to plot