Sweeping parameters with QCoDeS in LabOne Q¶
This notebook shows you how to perform a very general 2D sweep. Here, the two sweep axes are set through a QCoDeS parameter, mimicking arbitrary instruments that can be controlled with a QCoDeS driver.
0. General Imports¶
In [ ]:
Copied!
%config IPCompleter.greedy=True
import matplotlib.pyplot as plt
import numpy as np
import logging
import time
from IPython.display import clear_output
from laboneq.simple import *
%config IPCompleter.greedy=True
import matplotlib.pyplot as plt
import numpy as np
import logging
import time
from IPython.display import clear_output
from laboneq.simple import *
In [ ]:
Copied!
import qcodes as qc
from qcodes.tests.instrument_mocks import DummyInstrument
import qcodes as qc
from qcodes.tests.instrument_mocks import DummyInstrument
In [ ]:
Copied!
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_LO = DummyInstrument(name="RF_source", gates=["P", "f"])
# generate dummy instruments
my_magnet = DummyInstrument(name="magnet", gates=["Bx", "By", "Bz"])
my_LO = DummyInstrument(name="RF_source", gates=["P", "f"])
1. Device Setup¶
1.1 Create device setup¶
In [ ]:
Copied!
descriptor = f"""\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
descriptor = f"""\
instruments:
MFLI:
- address: DEV5534
uid: device_mfli
"""
device_setup = DeviceSetup.from_descriptor(
descriptor,
server_host="your_ip_address",
server_port=8004,
setup_name="MySetup",
)
2. MFLI example¶
2.1 Connect session¶
In [ ]:
Copied!
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
# create and connect to session
session = Session(device_setup=device_setup)
session.connect(do_emulation=True)
Connect to the instrument in the session
In [ ]:
Copied!
mfli = session.devices["device_mfli"]
mfli = session.devices["device_mfli"]
2.2 Experiment Definition¶
In [ ]:
Copied!
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use user function
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use user function
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
## constant definition
INT_TIME = 30e-3
# Define sweep parameter
magnet_sweep = LinearSweepParameter(
uid="Bfield_sweep", start=-400, stop=400, count=9, axis_name="Magnetic field (mT)"
)
frequency_sweep = LinearSweepParameter(
uid="frequency_sweep", start=0, stop=400, count=5, axis_name="Frequency (MHz)"
)
## Create Experiment
exp = Experiment("Generic experiment")
# define experiment
with exp.sweep(uid="outer_sweep", parameter=magnet_sweep):
# use user function
exp.call("set_magnet", value=magnet_sweep)
with exp.sweep(uid="inner_sweep", parameter=frequency_sweep):
# use user function
exp.call("set_frequency", value=frequency_sweep)
exp.call("readMFLI", settling_time=0.1)
2.3 Configure MFLI and DAQ module¶
In [ ]:
Copied!
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the matrix’s horizontal axis (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrevelant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
# configure MFLI
demod = mfli.demods[0] # which demodulator to use (depends on MF option)
with mfli.set_transaction():
mfli.demods["*"].enable(False)
mfli.oscs[0].freq(1e6)
demod.order(1)
demod.rate(1e3)
demod.trigger("continuous")
demod.timeconstant(10e-3)
demod.enable(True)
# Parameters
DEMOD_RATE_MFLI = demod.rate() # read the value from the instrument
NUM_COLS = int(
np.ceil(DEMOD_RATE_MFLI * INT_TIME)
) # Number of samples per burst. Corresponds to length of time trace in units of sampling rate.
# Module creation
daq_module = mfli._session.modules.daq # Create DAQ module
daq_module.device(mfli) # Assign DAQ module to instrument
daq_module.type(0) # Continuous acquisition
daq_module.endless(False) # Single acquisition/trace
# Shape of my grid
daq_module.grid.mode(
4
) # Specify how the acquired data is sampled onto the matrix’s horizontal axis (4='exact')
daq_module.count(1) # Number of grids to be acquired
daq_module.grid.cols(
NUM_COLS
) # Length of acquired trace (in units of demodulator sample)
daq_module.grid.rows(1) # Number of rows per acquisition run
daq_module.grid.rowrepetition(
False
) # Averaging mode of rows (irrevelant for grid.rows(1))
# True: First average each row, then fill the next row -> sequential averaging
# False: First fill each row, then average the rows -> cyclic averaging
# Subscribe to the values that should be measured
# Nodes to read
sample_nodes = [
demod.sample.r.avg,
demod.sample.theta.avg,
]
for node in sample_nodes:
daq_module.subscribe(node)
# Print relevant settings if needed
# print(f"Columns: {daq_module.grid.cols()}")
# print(f"Rows: {daq_module.grid.rows()}")
# print(f"Repetitions: {daq_module.grid.repetitions()}")
# print(f"Holdoff: {daq_module.holdoff.time()}")
2.4 Define user functions for arming MFLI and reading results¶
In [ ]:
Copied!
def readMFLI(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() == True:
progress = daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
progress = daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
f"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
def readMFLI(session, settling_time):
if session.connection_state.emulated:
return "Emulation running"
clockbase = mfli.clockbase()
timeout = 10 # s
time.sleep(settling_time)
daq_module.execute()
# Retrieve data from UHFLI DAQ module
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(INT_TIME)
if daq_module.raw_module.finished() == True:
progress = daq_module.raw_module.finished()
# print(f"Progress of data acquisition: {100 * progress:.2f}%.")
break
progress = daq_module.raw_module.finished()
if not (time.time() - start_time < timeout):
print(
f"Data acquisition timed out. Not all results collected, data is corrupted."
)
# Get data
daq_data = daq_module.read(raw=False, clk_rate=clockbase)
return daq_data
In [ ]:
Copied!
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_LO.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_LO.f.get() # return new value
def set_magnet(session, value):
my_magnet.Bx.set(value) # set new value in mT
print(f"Set magnet to new value:{value}")
time.sleep(0.1) # settling time
return my_magnet.Bx.get() # return new value
def set_frequency(session, value):
my_LO.f.set(value) # set new value in MHz
print(f"Set new frequency:{value}")
time.sleep(0.1) # settling time
return my_LO.f.get() # return new value
In [ ]:
Copied!
# register user functions
session.register_user_function(set_magnet, "set_magnet")
session.register_user_function(set_frequency, "set_frequency")
session.register_user_function(readMFLI, "readMFLI")
# register user functions
session.register_user_function(set_magnet, "set_magnet")
session.register_user_function(set_frequency, "set_frequency")
session.register_user_function(readMFLI, "readMFLI")
2.5 Run experiment¶
In [ ]:
Copied!
my_results = session.run(exp)
my_results = session.run(exp)
3. Plot results¶
In [ ]:
Copied!
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.user_func_results["readMFLI"].__len__()):
values.append(
my_results.user_func_results["readMFLI"][idx][node][0].value[0]
)
times.append(my_results.user_func_results["readMFLI"][idx][node][0].time[0])
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
fig, axs = plt.subplots(1, 2, figsize=(10, 7))
fig.tight_layout(pad=5)
sweep_axes = []
for x in my_results.experiment.all_sections():
sweep_axes.append(x.parameters[0])
for dimension, node in enumerate(sample_nodes):
# extract all data and put into a result list
values, times = ([], [])
for idx in range(my_results.user_func_results["readMFLI"].__len__()):
values.append(
my_results.user_func_results["readMFLI"][idx][node][0].value[0]
)
times.append(my_results.user_func_results["readMFLI"][idx][node][0].time[0])
# post process time traces
# here: average
for ii in range(len(values)):
values[ii] = np.average(values[ii])
# reshape results into dimensions of original sweep
values = np.array(values).reshape(
sweep_axes[0].count,
# int(len(values)/sweep_axes[1].count),
sweep_axes[1].count,
)
# plot the values/datapoints
ax = axs[dimension]
pcm = ax.pcolormesh(
sweep_axes[1].values,
sweep_axes[0].values,
values,
shading="nearest",
)
fig.colorbar(pcm, ax=ax, label=str(node))
ax.set_xlabel(sweep_axes[1].axis_name)
ax.set_ylabel(sweep_axes[0].axis_name)
else:
print("Emulation - nothing to plot")
3.1 Plot individual time traces¶
In [ ]:
Copied!
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.user_func_results["readMFLI"].__len__()):
results = my_results.user_func_results["readMFLI"][idx][node][0] # Results
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")
if not session.connection_state.emulated:
clockbase = mfli.clockbase()
for node in sample_nodes:
plt.figure()
for idx in range(my_results.user_func_results["readMFLI"].__len__()):
results = my_results.user_func_results["readMFLI"][idx][node][0] # Results
plt.plot(results.time, results.value[0], label=f"readout step {int(idx+1)}")
plt.xlabel("Time [s]")
plt.ylabel(str(node))
# plt.legend(loc='best', fontsize=8)
plt.title("MFLI time traces of demodulated data")
else:
print("Emulation - nothing to plot")