reqivo.utils packageΒΆ

SubmodulesΒΆ

reqivo.utils.serialization moduleΒΆ

src/reqivo/utils/serialization.py

Serialization utilities for Reqivo (JSON, form-urlencode, etc.).

class reqivo.utils.serialization.Any(*args, **kwargs)[source]ΒΆ

Bases: object

Special type indicating an unconstrained type.

  • Any is compatible with every type.

  • Any assumed to have all methods.

  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

reqivo.utils.serialization.to_json(data: Any) str[source]ΒΆ

Serializes data to a JSON string.

reqivo.utils.timing moduleΒΆ

src/reqivo/utils/timing.py

Timeouts configuration.

class reqivo.utils.timing.Timeout(connect: float | None = None, read: float | None = None, total: float | None = None)[source]ΒΆ

Bases: object

Timeout configuration.

connectΒΆ

Maximum time to wait for connection establishment (socket connect).

Type:

float | None

readΒΆ

Maximum time to wait for data to be received (socket recv).

Type:

float | None

totalΒΆ

Maximum total time for the operation.

Type:

float | None

classmethod from_float(timeout: float | None) Timeout[source]ΒΆ

Create a Timeout instance from a single float (total timeout fallback).

reqivo.utils.validators moduleΒΆ

src/reqivo/utils/validators.py

Validation utilities for Reqivo.

reqivo.utils.validators.validate_url(url: str) bool[source]ΒΆ

Simple URL validation.

reqivo.utils.websocket_utils moduleΒΆ

src/reqivo/utils/websocket_utils.py

WebSocket utilities for Reqivo.

exception reqivo.utils.websocket_utils.WebSocketError[source]ΒΆ

Bases: Exception

Exception raised for WebSocket-related errors.

reqivo.utils.websocket_utils.apply_mask(data: bytes, mask: bytes) bytes[source]ΒΆ

Apply XOR mask to data.

reqivo.utils.websocket_utils.create_frame(payload: bytes, opcode: int, mask: bool = True) bytes[source]ΒΆ

Create a WebSocket frame.

reqivo.utils.websocket_utils.parse_frame_header(data: bytes) Tuple[int, int, bool, int, int, int, int, bool] | None[source]ΒΆ

Parse WebSocket frame header. Returns: (header_len, payload_len, fin, rsv1, rsv2, rsv3, opcode, masked)