CCSDS Package

This package contains all CCSDS related components

class spacepackets.ccsds.AbstractSpacePacket

Bases: ABC

property apid: int
abstract property ccsds_version: int
abstract pack() bytearray
abstract property packet_id: PacketId
abstract property packet_seq_control: PacketSeqCtrl
property packet_type: PacketType
property sec_header_flag: bool
property seq_count: int
property seq_flags: SequenceFlags
class spacepackets.ccsds.PacketId(ptype: PacketType, sec_header_flag: bool, apid: int)

Bases: object

The packet ID forms the last thirteen bits of the first two bytes of the space packet header.

classmethod empty() PacketId
classmethod from_raw(raw: int) PacketId
raw() int
class spacepackets.ccsds.PacketSeqCtrl(seq_flags: SequenceFlags, seq_count: int)

Bases: object

The packet sequence control is the third and fourth byte of the space packet header. It contains the sequence flags and the 14-bit sequence count.

classmethod empty() PacketSeqCtrl
classmethod from_raw(raw: int) PacketSeqCtrl
raw() int
class spacepackets.ccsds.PacketType(value)

Bases: IntEnum

An enumeration.

TC = 1
TM = 0
class spacepackets.ccsds.ParserResult(tm_list: 'list[bytes]', skipped_ranges: 'list[range]', scanned_bytes: 'int')

Bases: object

classmethod empty() ParserResult
num_of_found_packets() int
scanned_bytes: int

Number of bytes scanned. Incomplete packets will not increment this number. Therefore, this can be smaller than the total size of the provided buffer. Furthermore, the packet parser will not scan fragments at the buffer end which are smaller than the minimum CCSDS space packet size.

skipped_ranges: list[range]

Range of bytes which were skipped during parsing. This can happen if there are broken/invalid spacepackets or packets/bytes which are not CCSDS spacepackets in the datastream. This context information can be used for debugging.

tm_list: list[bytes]

List of parsed space packets.

class spacepackets.ccsds.SequenceFlags(value)

Bases: IntEnum

An enumeration.

CONTINUATION_SEGMENT = 0
FIRST_SEGMENT = 1
LAST_SEGMENT = 2
UNSEGMENTED = 3
spacepackets.ccsds.SpHeader

alias of SpacePacketHeader

class spacepackets.ccsds.SpacePacket(sp_header: SpacePacketHeader, sec_header: bytes | bytearray | None, user_data: bytes | bytearray | None)

Bases: object

Generic CCSDS space packet which consists of the primary header and can optionally include a secondary header and a user data field.

If the secondary header flag in the primary header is set, the secondary header in mandatory. If it is not set, the user data is mandatory.

property apid: int
pack() bytearray

Pack the raw byte representation of the space packet.

Raises:

ValueError – Mandatory fields were not supplied properly

property sec_header_flag: bool
property seq_count: int
class spacepackets.ccsds.SpacePacketHeader(packet_type: PacketType, apid: int, seq_count: int, data_len: int, sec_header_flag: bool = False, seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, ccsds_version: int = 0)

Bases: AbstractSpacePacket

This class encapsulates the space packet header. Packet reference: Blue Book CCSDS 133.0-B-2

Create a space packet header with the given field parameters.

The data length field can also be set from the total packet length by using the set_data_len_from_packet_len() method after construction of the space packet header object.

>>> sph = SpacePacketHeader(packet_type=PacketType.TC, apid=0x42, seq_count=0, data_len=12)
>>> hex(sph.apid)
'0x42'
>>> sph.packet_type
<PacketType.TC: 1>
>>> sph.data_len
12
>>> sph.packet_len
19
>>> sph.packet_id
PacketId(ptype=<PacketType.TC: 1>, sec_header_flag=False, apid=66)
>>> sph.packet_seq_control
PacketSeqCtrl(seq_flags=<SequenceFlags.UNSEGMENTED: 3>, seq_count=0)
Parameters:
  • packet_type (PacketType) – 0 for Telemetery, 1 for Telecommands

  • apid (int) – Application Process ID, should not be larger than 11 bits, deciaml 2074 or hex 0x7ff

  • seq_count (int) – Source sequence counter, should not be larger than 0x3fff or decimal 16383

  • data_len (int) – Contains a length count C that equals one fewer than the length of the packet data field. Should not be larger than 65535 bytes

  • sec_header_flag (bool) – Secondary header flag, or False by default.

  • seq_flags – Sequence flags, defaults to unsegmented.

  • ccsds_version (int) – Version of the CCSDS packet. Defaults to 0b000

Raises:

ValueError – On invalid parameters

property apid: int
property ccsds_version: int
classmethod from_composite_fields(packet_id: PacketId, psc: PacketSeqCtrl, data_length: int, packet_version: int = 0) SpacePacketHeader
property header_len: int
pack() bytearray

Serialize raw space packet header into a bytearray, using big endian for each 2 octet field of the space packet header.

property packet_id: PacketId
property packet_len: int

Retrieve the full space packet size when packed.

The full packet size is the data length field plus the CCSDS_HEADER_LEN of 6 bytes plus one.

Return type:

Size of the TM packet based on the space packet header data length field.

property packet_seq_control: PacketSeqCtrl
property packet_type: PacketType
property sec_header_flag: bool
property seq_count: int
property seq_flags: SequenceFlags
set_data_len_from_packet_len(packet_len: int) None

Sets the data length field from the given total packet length. The total packet length must be at least 7 bytes.

Raises:

ValueError – The passed packet length is smaller than the minimum expected 7 bytes.

classmethod tc(apid: int, seq_count: int, data_len: int, sec_header_flag: bool = False, seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, ccsds_version: int = 0) SpacePacketHeader

Create a space packet header with the given field parameters for a telecommand packet. Calls the default constructor SpacePacketHeader() with the packet type set to PacketType.TC.

classmethod tm(apid: int, seq_count: int, data_len: int, sec_header_flag: bool = False, seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, ccsds_version: int = 0) SpacePacketHeader

Create a space packet header with the given field parameters for a telemetry packet. Calls the default constructor SpacePacketHeader() with the packet type set to PacketType.TM.

classmethod unpack(data: bytes | bytearray) SpacePacketHeader

Unpack a raw space packet into the space packet header instance.

Raises:

ValueError – Raw packet length invalid

spacepackets.ccsds.get_total_space_packet_len_from_len_field(len_field: int) int

Definition of length field is: C = (Octets in data field - 1). Therefore, octets in data field in len_field plus one. The total space packet length is therefore len_field plus one plus the space packet header size (6)

spacepackets.ccsds.parse_space_packets(buf: bytearray | bytes, packet_ids: Sequence[PacketId]) ParserResult

Given a byte buffer, parse for space packets which start with the provided list of packet IDs.

If there are broken/invalid spacepackets or packets/bytes which are not CCSDS spacepackets in the datastream, those bytes will be skipped, and the range of skipped bytes is returned as part of the ParserResult. This context information can be used for debugging or management of invalid data.

In case of incomplete packets where are partial packet header is found at the end of the buffer, the scanned bytes number will be the start of the incomplete packet. The number of scanned bytes returned might also be smaller than the buffer size for invalid data because the packet parser will not scan fragments at the end of the buffer which are smaller than the minimum space packet header length.

spacepackets.ccsds.parse_space_packets_from_deque(analysis_queue: deque[bytearray | bytes], packet_ids: Sequence[PacketId]) ParserResult

Given a deque of bytearrays, parse for space packets which start with the provided list of packet IDs.

This funtion expects the deque to be filled on the right side, for example with collections.deque.append(). This function only reads the given deque. It fills the provided queue content into a regular byte buffer and then calls parse_space_packets().

The user needs to take care to clear the analysis queue depending on how this API is used. The number of bytes scanned is returned as part of the ParserResult and can be used to clear the deque or only keep relevant portions for the next parse call.