flyde.node

Component

Bases: Node

A node that runs a function when executed.

Source code in flyde/node.py
class Component(Node):
    """A node that runs a function when executed."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._stop = Event()
        self._mutex = Lock()

    """Run the main component function.

    Defaut implementation looks for a reactive method called `process()` and calls it passing the input values.
    """

    def run(self):
        if not hasattr(self, "process"):
            raise NotImplementedError("Component does not have neither run() nor process() method. No code to run.")

        def worker():
            logger.debug(f"Running {self._id} worker")

            # Check if all inputs are sticky or static (not queue)
            # If so, we only run the loop once
            all_sticky_or_static = True
            for inp in self.inputs.values():
                if inp._input_mode == InputMode.QUEUE:
                    all_sticky_or_static = False
                    break

            run_once = len(self.inputs) > 0 and all_sticky_or_static

            while not self._stop.is_set():
                logger.debug(f"Waiting for inputs on {self._id}")
                inputs = {}
                queue_count = 0
                queue_closed_count = 0
                skip_iteration = False

                for key, inp in self.inputs.items():
                    is_queue = inp._input_mode == InputMode.QUEUE
                    value = inp.get()
                    inputs[key] = value

                    # Count EOFs received on non-static inputs
                    if is_queue:
                        queue_count += 1
                        if is_EOF(value):
                            # The input may be connected to multiple outputs, so we need to count the references
                            if inp.ref_count > 0:
                                inp.dec_ref_count()
                            if inp.ref_count == 0:
                                queue_closed_count += 1
                            else:
                                # Ignore this EOF, it's not the last one
                                inputs[key] = None
                                skip_iteration = True

                if skip_iteration:
                    continue

                # If all of the queue input values are EOF, stop the component
                if queue_count > 0 and queue_count == queue_closed_count:
                    logger.debug(f"All queue inputs are EOF, stopping {self._id}")
                    self.stop()
                    break

                logger.debug(f"Processing {self._id} with inputs: {inputs}")
                res = self.process(**inputs)  # type: ignore
                if isinstance(res, dict) or (isinstance(res, tuple) and hasattr(res, "_fields")):
                    # Send values to the outputs named as keys
                    for k, v in res.items():  # type: ignore
                        if k not in self.outputs:
                            # Return Exception instead of raising because we are in a thread
                            e = ValueError(
                                f'{self._node_type}.process(): sending to non-existing output "{k}" from return value'
                            )
                            # logger.error(e)
                            self.stop()
                            self.finish()
                            raise e

                        logger.debug(f"Sending value '{v}' to output {k} of {self._id}")
                        if self.outputs[k].connected:
                            self.outputs[k].send(v)

                # If all inputs are sticky/static, exit after the first iteration
                if run_once:
                    logger.debug(f"All inputs are sticky or static for {self._id}, stopping after first execution")
                    break

            self.finish()

        logger.debug(f"Starting {self._id} thread")
        thread = Thread(target=worker, daemon=False)
        thread.start()

    def stop(self):
        """Stop the component execution."""
        logger.debug(f"Stopping {self._id}")
        self._stop.set()

stop()

Stop the component execution.

Source code in flyde/node.py
def stop(self):
    """Stop the component execution."""
    logger.debug(f"Stopping {self._id}")
    self._stop.set()

Graph

Bases: Node

A visual graph node that contains other nodes.

Source code in flyde/node.py
class Graph(Node):
    """A visual graph node that contains other nodes."""

    def __init__(
        self,
        /,
        id: str = "",
        node_type: str = "",
        config: dict[str, InputConfig] = {},
        display_name: str = "",
        instances: dict[str, Node] = {},
        instances_stopped: dict[str, Event] = {},
        connections: list[Connection] = [],
        inputs: dict[str, GraphPort] = {},
        outputs: dict[str, GraphPort] = {},
        stopped: Event = Event(),
    ):
        super().__init__(
            id=id,
            node_type=node_type,
            config=config,
            display_name=display_name,
            stopped=stopped,
        )

        self.inputs: dict[str, GraphPort] = inputs  # type: ignore
        self.outputs: dict[str, GraphPort] = outputs  # type: ignore
        self._connections = connections
        self._instances = instances
        self._instances_stopped = instances_stopped

        # Wire all connections
        for conn in self._connections:
            from_id = conn.from_node.ins_id
            from_pin = conn.from_node.pin_id
            to_id = conn.to_node.ins_id
            to_pin = conn.to_node.pin_id

            # Validate the ids
            if from_id == "__this":
                if from_pin not in self.inputs:
                    raise ValueError(f"Input {from_pin} not found in graph {self._id}")
            else:
                self._check_pin("out", from_id, from_pin)

            if to_id == "__this":
                if to_pin not in self.outputs:
                    raise ValueError(f"Output {to_pin} not found in graph {self._id}")
            else:
                self._check_pin("in", to_id, to_pin)

            if from_id != "__this" and to_id != "__this":
                # Simple case: connect two instances inside the graph
                input = self._instances[to_id].inputs[to_pin]
                q = input.queue
                output = self._instances[from_id].outputs[from_pin]
                output.connect(q)
                input.inc_ref_count()
            elif from_id == "__this":
                # Graph ports are reversed: we read from inputs and write to outputs
                input = self._instances[to_id].inputs[to_pin]
                q = input.queue
                output = self.inputs[from_pin]
                output.connect(q)
                input.inc_ref_count()
            elif to_id == "__this":
                input = self.outputs[to_pin]
                q = input.queue
                output = self._instances[from_id].outputs[from_pin]
                output.connect(q)
                input.inc_ref_count()

    def _check_pin(self, pin_type: str, instance_id: str, pin_id: str):
        """Check if the instance and pin exist."""
        if instance_id not in self._instances:
            raise ValueError(f"Instance {instance_id} not found")
        if pin_type == "in" and pin_id not in self._instances[instance_id].inputs:
            raise ValueError(f"Input {pin_id} not found in instance {instance_id}")
        if pin_type == "out" and pin_id not in self._instances[instance_id].outputs:
            raise ValueError(f"Output {pin_id} not found in instance {instance_id}")

    def run(self):
        """Run the graph."""
        for instance in self._instances.values():
            logger.debug(f"Running instance {instance._id} of type {instance._node_type}")
            instance.run()

        def worker():
            logger.debug(f"Running {self._id} worker")
            # Wait for all instances to finish
            for k, v in self._instances_stopped.items():
                logger.debug(f"Waiting for instance {k} to stop")
                v.wait()
                logger.debug(f"Instance {k} stopped")
            self.finish()
            logger.debug(f"Graph {self._id} finished")

        logger.debug(f"Starting {self._id} thread")
        thread = Thread(target=worker, daemon=False)
        thread.start()

    def shutdown(self):
        """Call shutdown handlers on all instances.

        This method is called from the main thread to allow cleanup and things like UI."""
        for instance in self._instances.values():
            # If the instance has a `shutdown()` handler method, call it at this point
            if hasattr(instance, "shutdown"):
                instance.shutdown()

    def stop(self):
        """Stop all instances gracefully."""
        # Close all inputs and wait for all instances to stop
        for v in self.inputs.values():
            if v.queue is not None and v.ref_count == 0:
                v.queue.put(EOF)

    def terminate(self):
        """Terminate all instances immediately."""
        for instance in self._instances.values():
            instance.stop()
        self.stop()
        self.finish()

    @property
    def stopped(self) -> Event:
        """Return the stopped event which is set when the node has stopped."""
        return self._stopped

    @stopped.setter
    def stopped(self, value: Event):
        """Set the stopped event."""
        self._stopped = value

    @classmethod
    def from_yaml(cls, create: InstanceFactory, yml: dict):
        """Create a Graph node from a parsed YAML dictionary."""
        # Load metadata
        node_type = yml.get("nodeId", __name__)
        id = yml["id"] if "id" in yml else create_instance_id(node_type)
        config = {k: InputConfig(**v) for k, v in yml.get("config", {}).items()}
        display_name = yml.get("displayName", node_type)

        # Load instances and macros
        instances = {}
        instances_stopped = {}
        for ins in yml.get("instances", []):
            ins_id = ins["id"]
            stopped = Event()
            ins["stopped"] = stopped
            logger.debug(f"Creating instance {ins_id}")
            instances[ins_id] = Node.from_yaml(create, ins)
            instances_stopped[ins_id] = stopped
            logger.debug(f"Loaded instance {ins_id}")

        # Load connections and graph inputs/outputs
        connections = [Connection.from_yaml(conn) for conn in yml.get("connections", [])]
        inputs = {}
        for k, v in yml.get("inputs", {}).items():
            if "mode" in v:
                v["required"] = Requiredness(v["mode"])
                del v["mode"]
            inputs[k] = GraphPort(id=k, **v)
        outputs = {k: GraphPort(id=k, **v) for k, v in yml.get("outputs", {}).items()}

        # Initialize the stopped event
        stopped = Event()
        if "stopped" in yml:
            logger.debug(f"Creating graph {id} from yaml with stopped event")
            stopped = yml["stopped"]

        # Instatiate through the constructor
        return cls(
            id=id,
            node_type=node_type,
            config=config,
            display_name=display_name,
            instances=instances,
            instances_stopped=instances_stopped,
            connections=connections,
            inputs=inputs,
            outputs=outputs,
            stopped=stopped,
        )

    def to_dict(self) -> dict:
        """Return a dictionary representation of the node."""
        return {
            "id": self._id,
            "nodeId": self._node_type,
            "config": self._config,
            "displayName": self._display_name,
            "inputs": self.inputs,
            "outputs": self.outputs,
            "instances": [v.to_dict() for k, v in self._instances.items()],
            "connections": [conn.to_dict() for conn in self._connections],
        }

stopped: Event property writable

Return the stopped event which is set when the node has stopped.

from_yaml(create, yml) classmethod

Create a Graph node from a parsed YAML dictionary.

Source code in flyde/node.py
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
    """Create a Graph node from a parsed YAML dictionary."""
    # Load metadata
    node_type = yml.get("nodeId", __name__)
    id = yml["id"] if "id" in yml else create_instance_id(node_type)
    config = {k: InputConfig(**v) for k, v in yml.get("config", {}).items()}
    display_name = yml.get("displayName", node_type)

    # Load instances and macros
    instances = {}
    instances_stopped = {}
    for ins in yml.get("instances", []):
        ins_id = ins["id"]
        stopped = Event()
        ins["stopped"] = stopped
        logger.debug(f"Creating instance {ins_id}")
        instances[ins_id] = Node.from_yaml(create, ins)
        instances_stopped[ins_id] = stopped
        logger.debug(f"Loaded instance {ins_id}")

    # Load connections and graph inputs/outputs
    connections = [Connection.from_yaml(conn) for conn in yml.get("connections", [])]
    inputs = {}
    for k, v in yml.get("inputs", {}).items():
        if "mode" in v:
            v["required"] = Requiredness(v["mode"])
            del v["mode"]
        inputs[k] = GraphPort(id=k, **v)
    outputs = {k: GraphPort(id=k, **v) for k, v in yml.get("outputs", {}).items()}

    # Initialize the stopped event
    stopped = Event()
    if "stopped" in yml:
        logger.debug(f"Creating graph {id} from yaml with stopped event")
        stopped = yml["stopped"]

    # Instatiate through the constructor
    return cls(
        id=id,
        node_type=node_type,
        config=config,
        display_name=display_name,
        instances=instances,
        instances_stopped=instances_stopped,
        connections=connections,
        inputs=inputs,
        outputs=outputs,
        stopped=stopped,
    )

run()

Run the graph.

Source code in flyde/node.py
def run(self):
    """Run the graph."""
    for instance in self._instances.values():
        logger.debug(f"Running instance {instance._id} of type {instance._node_type}")
        instance.run()

    def worker():
        logger.debug(f"Running {self._id} worker")
        # Wait for all instances to finish
        for k, v in self._instances_stopped.items():
            logger.debug(f"Waiting for instance {k} to stop")
            v.wait()
            logger.debug(f"Instance {k} stopped")
        self.finish()
        logger.debug(f"Graph {self._id} finished")

    logger.debug(f"Starting {self._id} thread")
    thread = Thread(target=worker, daemon=False)
    thread.start()

shutdown()

Call shutdown handlers on all instances.

This method is called from the main thread to allow cleanup and things like UI.

Source code in flyde/node.py
def shutdown(self):
    """Call shutdown handlers on all instances.

    This method is called from the main thread to allow cleanup and things like UI."""
    for instance in self._instances.values():
        # If the instance has a `shutdown()` handler method, call it at this point
        if hasattr(instance, "shutdown"):
            instance.shutdown()

stop()

Stop all instances gracefully.

Source code in flyde/node.py
def stop(self):
    """Stop all instances gracefully."""
    # Close all inputs and wait for all instances to stop
    for v in self.inputs.values():
        if v.queue is not None and v.ref_count == 0:
            v.queue.put(EOF)

terminate()

Terminate all instances immediately.

Source code in flyde/node.py
def terminate(self):
    """Terminate all instances immediately."""
    for instance in self._instances.values():
        instance.stop()
    self.stop()
    self.finish()

to_dict()

Return a dictionary representation of the node.

Source code in flyde/node.py
def to_dict(self) -> dict:
    """Return a dictionary representation of the node."""
    return {
        "id": self._id,
        "nodeId": self._node_type,
        "config": self._config,
        "displayName": self._display_name,
        "inputs": self.inputs,
        "outputs": self.outputs,
        "instances": [v.to_dict() for k, v in self._instances.items()],
        "connections": [conn.to_dict() for conn in self._connections],
    }

InstanceArgs dataclass

Arguments to pass to the instance factory.

Source code in flyde/node.py
@dataclass
class InstanceArgs:
    """Arguments to pass to the instance factory."""

    id: str
    display_name: str
    stopped: Optional[Event]
    config: dict[str, Any]
    type: InstanceType = InstanceType.CODE
    source: Optional[InstanceSource] = None

    def to_dict(self) -> dict:
        """Convert the instance arguments to a dictionary."""
        return {
            "id": self.id,
            "display_name": self.display_name,
            "stopped": self.stopped,
            "config": self.config,
        }

to_dict()

Convert the instance arguments to a dictionary.

Source code in flyde/node.py
def to_dict(self) -> dict:
    """Convert the instance arguments to a dictionary."""
    return {
        "id": self.id,
        "display_name": self.display_name,
        "stopped": self.stopped,
        "config": self.config,
    }

InstanceSource dataclass

Source configuration of an instance.

Source code in flyde/node.py
@dataclass
class InstanceSource:
    """Source configuration of an instance."""

    type: InstanceSourceType
    data: str

InstanceSourceType

Bases: Enum

InstanceSourceType is the source type of an instance.

FILE: The instance is created from a file. PACKAGE: The instance is created from a built in package. CUSTOM: The instance is created from a custom module with path format.

Source code in flyde/node.py
class InstanceSourceType(Enum):
    """InstanceSourceType is the source type of an instance.

    FILE: The instance is created from a file.
    PACKAGE: The instance is created from a built in package.
    CUSTOM: The instance is created from a custom module with path format."""

    FILE = "file"
    PACKAGE = "package"
    CUSTOM = "custom"

InstanceType

Bases: Enum

InstanceType is the type of an instance.

VISUAL: The instance is a visual node. CODE: The instance is a code node.

Source code in flyde/node.py
class InstanceType(Enum):
    """InstanceType is the type of an instance.

    VISUAL: The instance is a visual node.
    CODE: The instance is a code node.
    """

    VISUAL = "visual"
    CODE = "code"

Node

Bases: ABC

Node is the main building block of an application.

Attributes: id (str): A unique identifier for the node. node_type (str): The node type identifier. config (dict): A dictionary of input pin configurations. display_name (str): A human-readable name for the node. inputs (dict[str, Input]): Node input map. outputs (dict[str, Output]): Node output map.

Source code in flyde/node.py
class Node(ABC):
    """Node is the main building block of an application.

    Attributes:
        id (str): A unique identifier for the node.
        node_type (str): The node type identifier.
        config (dict): A dictionary of input pin configurations.
        display_name (str): A human-readable name for the node.
        inputs (dict[str, Input]): Node input map.
        outputs (dict[str, Output]): Node output map.
    """

    inputs: dict[str, Input] = {}
    outputs: dict[str, Output] = {}

    def __init__(
        self,
        /,
        id: str,
        node_type: str = "",
        display_name: str = "",
        inputs: dict[str, Input] = {},
        outputs: dict[str, Output] = {},
        stopped: Event = Event(),
        config: dict[str, InputConfig] = {},
    ):
        node_type = node_type if node_type else self.__class__.__name__
        self._node_type = node_type
        self._id = id if id else create_instance_id(node_type)
        self._display_name = display_name if display_name else node_type
        self._config_raw = config or {}
        self._config = self.parse_config(self._config_raw)

        if len(inputs) > 0:
            self.inputs = inputs
        elif hasattr(self.__class__, "inputs") and len(self.__class__.inputs) > 0:
            # Copy from class definition, but instance will have own connections
            self.inputs = deepcopy(self.__class__.inputs)
        else:
            self.inputs = {}

        for k, v in self.inputs.items():
            v.id = f"{self._id}.{k}"
            if k in self._config:
                v.apply_config(self._config[k])

        if len(outputs) > 0:
            self.outputs = outputs
        elif hasattr(self.__class__, "outputs") and len(self.__class__.outputs) > 0:
            # Copy from class definition, but instance will have own connections
            self.outputs = deepcopy(self.__class__.outputs)
        else:
            self.outputs = {}

        for k, vv in self.outputs.items():
            vv.id = f"{self._id}.{k}"

        self._stopped = stopped

    def parse_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Parse the raw config into a typed config dictionary."""
        result = {}
        for key, value in config.items():
            if isinstance(value, dict) and "type" in value and value["type"] in [item.value for item in InputType]:
                config_value = value.get("value", None)
                # If config_value is `{{key}}`, reset it to None, as this is the editor default for dynamic inputs
                if config_value == f"{{{{{key}}}}}":
                    config_value = None

                result[key] = InputConfig(
                    type=InputType(value["type"]),
                    value=config_value,
                )
            else:
                result[key] = value
        return result

    @abstractmethod
    def run(self):
        """Run the node. This method should be overridden by subclasses."""
        pass

    @abstractmethod
    def stop(self):
        """Stop the node. This method should be overridden by subclasses."""
        pass

    def finish(self):
        """Finish the component execution gracefully by closing all its outputs and notifying others."""
        logger.debug(f"Sending EOF to all outputs of {self._id}")
        for output in self.outputs.values():
            if output.connected:
                output.send(EOF)
        logger.debug(f"Node {self._id} finished, sending stopped event")
        self._stopped.set()
        logger.debug(f"Stop event set for node {self._id}")

    @property
    def stopped(self) -> Event:
        return self._stopped

    def shutdown(self):
        """Shutdown the component. This method is optional and can be overridden by subclasses."""
        pass

    def send(self, output_id: str, value: Any):
        """Send a value to an output."""
        if output_id not in self.outputs:
            raise ValueError(f"Output {output_id} not found in node {self._id}")
        self.outputs[output_id].send(value)

    def receive(self, input_id: str) -> Any:
        """Receive a value from an input."""
        if input_id not in self.inputs:
            raise ValueError(f"Input {input_id} not found in node {self._id}")
        value = self.inputs[input_id].get()
        if is_EOF(value):
            raise value
        return value

    @classmethod
    def from_yaml(cls, create: InstanceFactory, yml: dict):
        """Create a node from a parsed YAML dictionary."""
        node_class_name = yml.get("nodeId", "VisualNode")
        if node_class_name == "VisualNode":
            return Graph.from_yaml(create, yml)

        config = yml.get("config", {})

        source = InstanceSource(
            type=InstanceSourceType(yml.get("source", {}).get("type", "file").lower()),
            data=yml.get("source", {}).get("data", ""),
        )

        args = InstanceArgs(
            id=yml["id"],
            display_name=yml.get("displayName", ""),
            stopped=yml.get("stopped", None),  # It's a hacky way to pass the stopped event to the constructor
            config=config,
            type=InstanceType(yml.get("type", "code").lower()),
            source=source,
        )
        return create(node_class_name, args)

    def to_dict(self) -> dict:
        return {
            "id": self._id,
            "nodeId": self._node_type,
            "config": self._config,
            "displayName": self._display_name,
        }

finish()

Finish the component execution gracefully by closing all its outputs and notifying others.

Source code in flyde/node.py
def finish(self):
    """Finish the component execution gracefully by closing all its outputs and notifying others."""
    logger.debug(f"Sending EOF to all outputs of {self._id}")
    for output in self.outputs.values():
        if output.connected:
            output.send(EOF)
    logger.debug(f"Node {self._id} finished, sending stopped event")
    self._stopped.set()
    logger.debug(f"Stop event set for node {self._id}")

from_yaml(create, yml) classmethod

Create a node from a parsed YAML dictionary.

Source code in flyde/node.py
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
    """Create a node from a parsed YAML dictionary."""
    node_class_name = yml.get("nodeId", "VisualNode")
    if node_class_name == "VisualNode":
        return Graph.from_yaml(create, yml)

    config = yml.get("config", {})

    source = InstanceSource(
        type=InstanceSourceType(yml.get("source", {}).get("type", "file").lower()),
        data=yml.get("source", {}).get("data", ""),
    )

    args = InstanceArgs(
        id=yml["id"],
        display_name=yml.get("displayName", ""),
        stopped=yml.get("stopped", None),  # It's a hacky way to pass the stopped event to the constructor
        config=config,
        type=InstanceType(yml.get("type", "code").lower()),
        source=source,
    )
    return create(node_class_name, args)

parse_config(config)

Parse the raw config into a typed config dictionary.

Source code in flyde/node.py
def parse_config(self, config: dict[str, Any]) -> dict[str, Any]:
    """Parse the raw config into a typed config dictionary."""
    result = {}
    for key, value in config.items():
        if isinstance(value, dict) and "type" in value and value["type"] in [item.value for item in InputType]:
            config_value = value.get("value", None)
            # If config_value is `{{key}}`, reset it to None, as this is the editor default for dynamic inputs
            if config_value == f"{{{{{key}}}}}":
                config_value = None

            result[key] = InputConfig(
                type=InputType(value["type"]),
                value=config_value,
            )
        else:
            result[key] = value
    return result

receive(input_id)

Receive a value from an input.

Source code in flyde/node.py
def receive(self, input_id: str) -> Any:
    """Receive a value from an input."""
    if input_id not in self.inputs:
        raise ValueError(f"Input {input_id} not found in node {self._id}")
    value = self.inputs[input_id].get()
    if is_EOF(value):
        raise value
    return value

run() abstractmethod

Run the node. This method should be overridden by subclasses.

Source code in flyde/node.py
@abstractmethod
def run(self):
    """Run the node. This method should be overridden by subclasses."""
    pass

send(output_id, value)

Send a value to an output.

Source code in flyde/node.py
def send(self, output_id: str, value: Any):
    """Send a value to an output."""
    if output_id not in self.outputs:
        raise ValueError(f"Output {output_id} not found in node {self._id}")
    self.outputs[output_id].send(value)

shutdown()

Shutdown the component. This method is optional and can be overridden by subclasses.

Source code in flyde/node.py
def shutdown(self):
    """Shutdown the component. This method is optional and can be overridden by subclasses."""
    pass

stop() abstractmethod

Stop the node. This method should be overridden by subclasses.

Source code in flyde/node.py
@abstractmethod
def stop(self):
    """Stop the node. This method should be overridden by subclasses."""
    pass

create_instance_id(node_type)

Create a unique instance ID.

Source code in flyde/node.py
def create_instance_id(node_type: str) -> str:
    """Create a unique instance ID."""
    return f"{node_type}-{uuid4()}"