[docs]
class ZIBaseModule(ZIInstrument):
    """Generic toolkit driver for a LabOne Modules.
    All module specific class are derived from this class.
    It exposes the nodetree and also implements common functions valid for all
    modules.
    It also can be used directly, e.g. for modules that have no special class
    in toolkit.
    Args:
        tk_object: Underlying zhinst-toolkit object.
        session: Session to the Data Server.
        name: Name of the module in QCoDeS.
    """
    def __init__(self, tk_object: TKModuleType, session: "Session", name: str):
        self._tk_object = tk_object
        self._session = session
        super().__init__(
            f"zi_{name}_{len(self.instances())}", tk_object.root, is_module=True
        )
        init_nodetree(self, self._tk_object, self._snapshot_cache)
        self._tk_object.root.update_nodes(
            {
                "/device": {
                    "GetParser": lambda value: self._get_device(value),
                }
            },
            raise_for_invalid_node=False,
        )
    def _get_device(self, serial: str) -> t.Union["DeviceType", str]:
        """Convert a device serial into a QCoDeS device object.
        Args:
            serial: Serial of the device
        Returns:
            QCoDeS device object. If the serial does not
                match to a connected device the serial is returned instead.
        """
        try:
            return self._session.devices[serial]
        except (RuntimeError, KeyError):
            return serial
    def _get_node(self, node: str) -> t.Union[ZIParameter, str]:
        """Convert a raw node string into a qcodes node.
        Args:
            node (str): raw node string
        Returns:
            Node: qcodes node. (if the node can not be converted the raw node
                string is returned)
        """
        tk_node = self._tk_object._get_node(node)
        if isinstance(tk_node, str):
            return tk_node
        device = self._session.devices[tk_node.root.prefix_hide]
        return tk_node_to_parameter(device, tk_node)
    @staticmethod
    def _set_node(signal: t.Union[ZIParameter, TKNode, str]) -> str:
        """Convert a toolkit node into a raw node string.
        Args:
            signal (Union[Node,str]): node
        Returns:
            str: raw string node
        """
        try:
            node = signal.zi_node  # type: ignore[union-attr]
        except AttributeError:
            node = TKBaseModule._set_node(signal)
        return node
[docs]
    def subscribe(self, signal: t.Union[ZIParameter, str]):
        """Subscribe to a node.
        The node can either be a node of this module or of a connected device.
        Args:
            signal (Node): node that should be subscribed.
        """
        try:
            self._tk_object.subscribe(signal.zi_node)  # type: ignore[union-attr]
        except AttributeError:
            self._tk_object.subscribe(signal)
 
[docs]
    def unsubscribe(self, signal: t.Union[ZIParameter, str]):
        """Unsubscribe from a node.
        The node can either be a node of this module or of a connected device.
        Args:
            signal (Node): node that should be unsubscribe.
        """
        try:
            self._tk_object.unsubscribe(signal.zi_node)  # type: ignore[union-attr]
        except AttributeError:
            self._tk_object.unsubscribe(signal)
 
    @property
    def raw_module(self) -> ZIModule:  # type: ignore [type-var]
        """Underlying zhinst.core module."""
        return self._tk_object.raw_module
[docs]
    def wait_done(self, *, timeout: float = 20.0, sleep_time: float = 0.5) -> None:
        """Waits until the module is finished.
        Warning: Only usable for modules that make use of the `/finished` node.
        Args:
            timeout (float): The maximum waiting time in seconds for the
                measurement (default: 20).
            sleep_time (int): Time in seconds to wait between
                requesting sweeper state. (default: 0.5)
        Raises:
            TimeoutError: The measurement is not completed before
                timeout.
        """
        return self._tk_object.wait_done(timeout=timeout, sleep_time=sleep_time)
 
[docs]
    def progress(self) -> float:
        """Progress of the execution.
        Returns:
            Progress of the execution with a number between 0 and 1
        """
        return self._tk_object.progress()
 
[docs]
    def execute(self) -> None:
        """Start the module execution.
        Subscription or unsubscription is not possible until the execution is
        finished.
        .. versionadded:: 0.4.1
        """
        return self._tk_object.execute()
 
[docs]
    def read(self) -> NodeDict:
        """Read scope data.
        If the recording is still ongoing only a subset of data is returned.
        Returns:
            Scope data.
        """
        return NodeDict(self._tk_object.read())