flyde.io

EOF = Exception('__EOF__') module-attribute

EOF is a signal to indicate the end of data.

Connection

Connection is a connection between two nodes in a graph.

Source code in flyde/io.py
class Connection:
    """Connection is a connection between two nodes in a graph."""

    def __init__(
        self,
        from_node: ConnectionNode,
        to_node: ConnectionNode,
        delayed: bool = False,
        hidden: bool = False,
    ):
        self.from_node = from_node
        self.to_node = to_node
        self.delayed = delayed
        self.hidden = hidden

    @classmethod
    def from_yaml(cls, yml: dict):
        """Create a connection from a parsed YAML dictionary."""
        return cls(
            ConnectionNode(yml["from"]["insId"], yml["from"]["pinId"]),
            ConnectionNode(yml["to"]["insId"], yml["to"]["pinId"]),
            yml.get("delayed", False),
            yml.get("hidden", False),
        )

    def to_dict(self) -> dict:
        return {
            "from": {"insId": self.from_node.ins_id, "pinId": self.from_node.pin_id},
            "to": {"insId": self.to_node.ins_id, "pinId": self.to_node.pin_id},
            "delayed": self.delayed,
            "hidden": self.hidden,
        }

from_yaml(yml) classmethod

Create a connection from a parsed YAML dictionary.

Source code in flyde/io.py
@classmethod
def from_yaml(cls, yml: dict):
    """Create a connection from a parsed YAML dictionary."""
    return cls(
        ConnectionNode(yml["from"]["insId"], yml["from"]["pinId"]),
        ConnectionNode(yml["to"]["insId"], yml["to"]["pinId"]),
        yml.get("delayed", False),
        yml.get("hidden", False),
    )

ConnectionNode

ConnectionNode is a combination of a node and an input/output pin.

It is used as a source or destination of a connection.

Source code in flyde/io.py
class ConnectionNode:
    """ConnectionNode is a combination of a node and an input/output pin.

    It is used as a source or destination of a connection."""

    def __init__(self, ins_id: str, pin_id: str):
        self.ins_id = ins_id
        self.pin_id = pin_id

GraphPort

Bases: Input, Output

GraphPort is an interface between inside and outside of the graph used for input/output.

It combines Input and Output, because Graph Input acts as an Input for outside world, but outputs values inside the graph. Similarly, Graph Output acts as an Output for outside world, but receives values from inside the graph.

Source code in flyde/io.py
class GraphPort(Input, Output):
    """GraphPort is an interface between inside and outside of the graph used for input/output.

    It combines Input and Output, because Graph Input acts as an Input for outside world,
    but outputs values inside the graph. Similarly, Graph Output acts as an Output for outside world,
    but receives values from inside the graph."""

    def __init__(
        self,
        id: str = "",
        description: str = "",
        type: Optional[type] = None,
        value: Any = None,
        required: Requiredness = Requiredness.REQUIRED,
        output_mode: OutputMode = OutputMode.REF,
        delayed: bool = False,
    ):
        input_mode = InputMode.QUEUE
        Input.__init__(
            self,
            id=id,
            description=description,
            type=type,
            mode=input_mode,
            value=value,
            required=required,
        )
        Output.__init__(
            self,
            id=id,
            description=description,
            type=type,
            mode=output_mode,
            delayed=delayed,
        )

        # Use RedriveQueue instead of the normal input queue
        self._queue = RedirectQueue(self)  # type: ignore

    def inc_ref_count(self):
        # Need to increase ref count of the RedriveQueue
        self._queue.inc_ref_count()  # type: ignore
        return super().inc_ref_count()

    def dec_ref_count(self):
        self._queue.dec_ref_count()  # type: ignore
        return super().dec_ref_count()

Input

Input is an interface for getting input/output data for a node.

Source code in flyde/io.py
class Input:
    """Input is an interface for getting input/output data for a node."""

    def __init__(
        self,
        /,
        id: str = "",
        description: str = "",
        mode: InputMode = InputMode.QUEUE,
        type: Optional[type] = None,
        value: Any = None,
        required: Requiredness = Requiredness.REQUIRED,
    ):
        """Create a new input object.

        Args:
            id (str): The ID of the input
            description (str): The description of the input
            mode (InputMode): The mode of the input
            typ (type): The type of the input
            value (Any): The value of the input for InputMode = InputMode.STATIC or InputMode = InputMode.STICKY
            required (Required): The requiredness of the input
        """
        self.id = id
        self.description = description
        self.type = type
        self._input_mode = mode
        self._value = None
        if value is not None:
            if self.type is not None and not isinstance(value, type):  # type: ignore
                raise ValueError(f"Value {value} is not of type {self.type}")
            self._value = value
        self.required = required
        self._ref_count = 0

    @property
    def queue(self) -> Queue:
        """Get the queue of the input."""
        # Lazy initialization of the queue because initializing it in constructor prevents pickling
        if not hasattr(self, "_queue"):
            self._queue: Queue = Queue()
        return self._queue

    @property
    def is_connected(self) -> bool:
        """Check if the input is connected to a queue."""
        return hasattr(self, "_queue") and self._queue is not None

    @property
    def value(self) -> Any:
        """Get the static value associated with the input."""
        return self._value

    @value.setter
    def value(self, value: Any):
        """Set the static value of the input."""
        # Can be set to EOF to indicate end of data
        if self.type is not None and not is_EOF(value) and not isinstance(value, self.type):  # type: ignore
            raise ValueError(f"Value {value} is not of type {self.type}")
        self._value = value

    def get(self) -> Any:
        """Get the value of the input from either the queue or static value."""
        if not self.is_connected and (
            self.required == Requiredness.OPTIONAL or self.required == Requiredness.REQUIRED_IF_CONNECTED
        ):
            return self._value
        if self._input_mode == InputMode.QUEUE:
            return self.queue.get()
        elif self._input_mode == InputMode.STICKY:
            if not self.queue.empty() or self._value is None:
                value = self.queue.get()
                if not is_EOF(value):
                    # Ignore EOFs on sticky inputs, only queue inputs matter for termination
                    self._value = value
        return self._value

    def empty(self) -> bool:
        """Check if the input queue is empty."""
        if self._input_mode == InputMode.QUEUE:
            return self._queue.empty()
        return self._value is None

    def count(self) -> int:
        """Get the number of elements in the input queue."""
        if self._input_mode == InputMode.QUEUE:
            return self._queue.qsize()
        return 0 if self._value is None else 1

    def inc_ref_count(self):
        """Increment the reference count of the input."""
        self._ref_count += 1

    def dec_ref_count(self):
        """Decrement the reference count of the input."""
        self._ref_count -= 1

    @property
    def ref_count(self) -> int:
        """Get the reference count of the input."""
        return self._ref_count

    def apply_config(self, config: InputConfig):
        """Apply config from the flyde flow to the input."""
        self._value = config.value
        # If input mode is STICKY already, stick to it
        # as it may be important for the node to function correctly
        if self._input_mode != InputMode.STICKY:
            if config.type == InputType.DYNAMIC:
                self._input_mode = InputMode.QUEUE
            else:
                self._input_mode = InputMode.STICKY

        # Apply Python type hint based on supported config type
        if config.type != InputType.DYNAMIC and self.type is None:
            self.type = {
                InputType.NUMBER: int,
                InputType.BOOLEAN: bool,
                InputType.JSON: dict,
                InputType.STRING: str,
            }[config.type]

is_connected: bool property

Check if the input is connected to a queue.

queue: Queue property

Get the queue of the input.

ref_count: int property

Get the reference count of the input.

value: Any property writable

Get the static value associated with the input.

__init__(id='', description='', mode=InputMode.QUEUE, type=None, value=None, required=Requiredness.REQUIRED)

Create a new input object.

Args: id (str): The ID of the input description (str): The description of the input mode (InputMode): The mode of the input typ (type): The type of the input value (Any): The value of the input for InputMode = InputMode.STATIC or InputMode = InputMode.STICKY required (Required): The requiredness of the input

Source code in flyde/io.py
def __init__(
    self,
    /,
    id: str = "",
    description: str = "",
    mode: InputMode = InputMode.QUEUE,
    type: Optional[type] = None,
    value: Any = None,
    required: Requiredness = Requiredness.REQUIRED,
):
    """Create a new input object.

    Args:
        id (str): The ID of the input
        description (str): The description of the input
        mode (InputMode): The mode of the input
        typ (type): The type of the input
        value (Any): The value of the input for InputMode = InputMode.STATIC or InputMode = InputMode.STICKY
        required (Required): The requiredness of the input
    """
    self.id = id
    self.description = description
    self.type = type
    self._input_mode = mode
    self._value = None
    if value is not None:
        if self.type is not None and not isinstance(value, type):  # type: ignore
            raise ValueError(f"Value {value} is not of type {self.type}")
        self._value = value
    self.required = required
    self._ref_count = 0

apply_config(config)

Apply config from the flyde flow to the input.

Source code in flyde/io.py
def apply_config(self, config: InputConfig):
    """Apply config from the flyde flow to the input."""
    self._value = config.value
    # If input mode is STICKY already, stick to it
    # as it may be important for the node to function correctly
    if self._input_mode != InputMode.STICKY:
        if config.type == InputType.DYNAMIC:
            self._input_mode = InputMode.QUEUE
        else:
            self._input_mode = InputMode.STICKY

    # Apply Python type hint based on supported config type
    if config.type != InputType.DYNAMIC and self.type is None:
        self.type = {
            InputType.NUMBER: int,
            InputType.BOOLEAN: bool,
            InputType.JSON: dict,
            InputType.STRING: str,
        }[config.type]

count()

Get the number of elements in the input queue.

Source code in flyde/io.py
def count(self) -> int:
    """Get the number of elements in the input queue."""
    if self._input_mode == InputMode.QUEUE:
        return self._queue.qsize()
    return 0 if self._value is None else 1

dec_ref_count()

Decrement the reference count of the input.

Source code in flyde/io.py
def dec_ref_count(self):
    """Decrement the reference count of the input."""
    self._ref_count -= 1

empty()

Check if the input queue is empty.

Source code in flyde/io.py
def empty(self) -> bool:
    """Check if the input queue is empty."""
    if self._input_mode == InputMode.QUEUE:
        return self._queue.empty()
    return self._value is None

get()

Get the value of the input from either the queue or static value.

Source code in flyde/io.py
def get(self) -> Any:
    """Get the value of the input from either the queue or static value."""
    if not self.is_connected and (
        self.required == Requiredness.OPTIONAL or self.required == Requiredness.REQUIRED_IF_CONNECTED
    ):
        return self._value
    if self._input_mode == InputMode.QUEUE:
        return self.queue.get()
    elif self._input_mode == InputMode.STICKY:
        if not self.queue.empty() or self._value is None:
            value = self.queue.get()
            if not is_EOF(value):
                # Ignore EOFs on sticky inputs, only queue inputs matter for termination
                self._value = value
    return self._value

inc_ref_count()

Increment the reference count of the input.

Source code in flyde/io.py
def inc_ref_count(self):
    """Increment the reference count of the input."""
    self._ref_count += 1

InputConfig dataclass

Configuration of an input in a Flyde flow.

Source code in flyde/io.py
@dataclass
class InputConfig:
    """Configuration of an input in a Flyde flow."""

    type: InputType
    value: Optional[Any] = None

InputMode

Bases: Enum

InputMode is the mode of an input.

QUEUE: The input is connected to a queue. On each node invocation, a new value is taken from the queue. If the queue is empty, the node invocation is blocked. STICKY: The input has a sticky value. It has a queue attached to it, but the last received value is returned in absence of new values in the queue. Thus sticky inputs are non-blocking. STATIC: The input has a static value that does not change.

Source code in flyde/io.py
class InputMode(Enum):
    """InputMode is the mode of an input.

    QUEUE: The input is connected to a queue. On each node invocation, a new value is taken from the queue.
    If the queue is empty, the node invocation is blocked.
    STICKY: The input has a sticky value. It has a queue attached to it, but the last received value is returned in
    absence of new values in the queue. Thus sticky inputs are non-blocking.
    STATIC: The input has a static value that does not change."""

    QUEUE = "queue"
    STICKY = "sticky"
    STATIC = "static"

InputType

Bases: Enum

Input type contains all input types supported by Flyde.

Source code in flyde/io.py
class InputType(Enum):
    """Input type contains all input types supported by Flyde."""

    DYNAMIC = "dynamic"
    NUMBER = "number"
    BOOLEAN = "boolean"
    JSON = "json"
    STRING = "string"

Output

Output is an interface for setting output data for a component.

Source code in flyde/io.py
class Output:
    """Output is an interface for setting output data for a component."""

    def __init__(
        self,
        /,
        id: str = "",
        description: str = "",
        mode: OutputMode = OutputMode.REF,
        type: Optional[type] = None,
        delayed: bool = False,
    ):
        """Create a new output object.

        Args:
            id (str): The ID of the output
            description (str): The description of the output
            type (type): The type of the output
            delayed (bool): If the output is delayed [not implemented yet]
        """
        self.id = id
        self.description = description
        self._output_mode = mode
        self.type = type
        self.delayed = delayed
        self._queues: list[Queue] = []
        self._circle_index = 0

    def connect(self, queue: Queue):
        """Connect a queue to the output.

        This method can be called multiple times to connect multiple queues to the same output.
        """
        self._queues.append(queue)

    @property
    def connected(self) -> bool:
        """Check if the output is connected to a queue."""
        return len(self._queues) > 0

    def send(self, value: Any):
        """Put a value in the output queue."""
        if self.type is not None and not is_EOF(value) and not isinstance(value, self.type):  # type: ignore
            raise ValueError(f'Output "{self.id}": value {value} is not of type {self.type}')
        if len(self._queues) == 0:
            raise ValueError(f'Output "{self.id}": has no connected queues')

        if len(self._queues) == 1:
            self._queues[0].put(value)
            return

        if self._output_mode == OutputMode.CIRCLE:
            # Round-robin output queue selection
            self._queues[self._circle_index].put(value)
            self._circle_index = (self._circle_index + 1) % len(self._queues)
            return

        for i, queue in enumerate(self._queues):
            if self._output_mode == OutputMode.REF:
                queue.put(value)
            elif self._output_mode == OutputMode.VALUE:
                if i == 0:
                    # Send the original value to the first queue
                    queue.put(value)
                else:
                    # Send a deep copy of the value to the rest of the queues
                    queue.put(deepcopy(value))

connected: bool property

Check if the output is connected to a queue.

__init__(id='', description='', mode=OutputMode.REF, type=None, delayed=False)

Create a new output object.

Args: id (str): The ID of the output description (str): The description of the output type (type): The type of the output delayed (bool): If the output is delayed [not implemented yet]

Source code in flyde/io.py
def __init__(
    self,
    /,
    id: str = "",
    description: str = "",
    mode: OutputMode = OutputMode.REF,
    type: Optional[type] = None,
    delayed: bool = False,
):
    """Create a new output object.

    Args:
        id (str): The ID of the output
        description (str): The description of the output
        type (type): The type of the output
        delayed (bool): If the output is delayed [not implemented yet]
    """
    self.id = id
    self.description = description
    self._output_mode = mode
    self.type = type
    self.delayed = delayed
    self._queues: list[Queue] = []
    self._circle_index = 0

connect(queue)

Connect a queue to the output.

This method can be called multiple times to connect multiple queues to the same output.

Source code in flyde/io.py
def connect(self, queue: Queue):
    """Connect a queue to the output.

    This method can be called multiple times to connect multiple queues to the same output.
    """
    self._queues.append(queue)

send(value)

Put a value in the output queue.

Source code in flyde/io.py
def send(self, value: Any):
    """Put a value in the output queue."""
    if self.type is not None and not is_EOF(value) and not isinstance(value, self.type):  # type: ignore
        raise ValueError(f'Output "{self.id}": value {value} is not of type {self.type}')
    if len(self._queues) == 0:
        raise ValueError(f'Output "{self.id}": has no connected queues')

    if len(self._queues) == 1:
        self._queues[0].put(value)
        return

    if self._output_mode == OutputMode.CIRCLE:
        # Round-robin output queue selection
        self._queues[self._circle_index].put(value)
        self._circle_index = (self._circle_index + 1) % len(self._queues)
        return

    for i, queue in enumerate(self._queues):
        if self._output_mode == OutputMode.REF:
            queue.put(value)
        elif self._output_mode == OutputMode.VALUE:
            if i == 0:
                # Send the original value to the first queue
                queue.put(value)
            else:
                # Send a deep copy of the value to the rest of the queues
                queue.put(deepcopy(value))

OutputMode

Bases: Enum

OutputMode defines the behavior of an output if it is connected to multiple input queues.

REF: Copy-by-reference. Each connected input will receive the same object. VALUE: Copy-by-value. Each connected input will receive a deep copy of the object. CIRCLE: Circular. Each connected input will receive the object in a round-robin fashion.

Source code in flyde/io.py
class OutputMode(Enum):
    """OutputMode defines the behavior of an output if it is connected to multiple input queues.

    REF: Copy-by-reference. Each connected input will receive the same object.
    VALUE: Copy-by-value. Each connected input will receive a deep copy of the object.
    CIRCLE: Circular. Each connected input will receive the object in a round-robin fashion.
    """

    REF = "ref"
    VALUE = "value"
    CIRCLE = "circle"

RedirectQueue

RedriveQueue is a fake write-only queue that is used by GraphPort to redrive input values to the output queues.

Source code in flyde/io.py
class RedirectQueue:
    """RedriveQueue is a fake write-only queue that is used by GraphPort
    to redrive input values to the output queues."""

    def __init__(self, output: Output):
        self._output = output
        self._ref_count = 0

    @property
    def ref_count(self) -> int:
        return self._ref_count

    def inc_ref_count(self):
        self._ref_count += 1

    def dec_ref_count(self):
        self._ref_count -= 1

    def put(self, item: Any, block=True, timeout=None):
        if item is EOF:
            # Count references and only send EOF when all references are removed as we might have multiple inputs connected
            self.dec_ref_count()
            if self._ref_count <= 0:
                self._output.send(item)
        else:
            self._output.send(item)

Requiredness

Bases: Enum

Requiredness of an input.

REQUIRED: The input is required to be connected. OPTIONAL: The input is optional. REQUIRED_IF_CONNECTED: The input is required if it is connected to a queue.

Source code in flyde/io.py
class Requiredness(Enum):
    """Requiredness of an input.

    REQUIRED: The input is required to be connected.
    OPTIONAL: The input is optional.
    REQUIRED_IF_CONNECTED: The input is required if it is connected to a queue."""

    REQUIRED = "required"
    OPTIONAL = "optional"
    REQUIRED_IF_CONNECTED = "required-if-connected"

is_EOF(value)

Checks if a value is an EOF signal.

Source code in flyde/io.py
def is_EOF(value: Any) -> bool:
    """Checks if a value is an EOF signal."""
    return isinstance(value, Exception) and value.args[0] == "__EOF__"