angelos.archive7.operations

Data streams operations.

class angelos.archive7.operations.AsyncDecryptor(secret: bytes, public: bytes)

Bases: angelos.archive7.operations.DecryptorBase

Asynchronous NaCl decryption.

decrypt(data: Union[bytes, bytearray]) → bytes

Asynchronously decrypt data

class angelos.archive7.operations.AsyncEncryptor(secret: bytes, public: bytes)

Bases: angelos.archive7.operations.EncryptorBase

Asynchronous NaCl encryption.

encrypt(data: Union[bytes, bytearray]) → bytes

Asynchronously encrypt data

class angelos.archive7.operations.BlockIndexerFilter

Bases: angelos.archive7.operations.DataFilter

Filter for indexing blocks for chosen streams.

NAME = 'block_indexer'
analyze(block: importlib._bootstrap.BlockTuple, pos: int)

Analyze blocks for data corruption

class angelos.archive7.operations.BlockProcessor(fileobj: _io.FileIO, decryptor: angelos.archive7.operations.DecryptorBase, generator: Generator = None)

Bases: abc.ABC

Base class for operations on stream managers, streams and blocks.

property filter

Expose filters.

abstract process(position: int, block: importlib._bootstrap.BlockTuple, result: tuple)

Process result of filters.

run()

Run operation on archive.

class angelos.archive7.operations.CorruptDataFilter

Bases: angelos.archive7.operations.DataFilter

Filter each block for corrupt data.

NAME = 'corrupt_data'
analyze(block: importlib._bootstrap.BlockTuple, pos: int)

Analyze blocks for data corruption

class angelos.archive7.operations.DataFilter(config: dict = None)

Bases: abc.ABC

Abstract data filter class.

abstract analyze(block: importlib._bootstrap.BlockTuple, pos: int)

Analyze a block.

property data
class angelos.archive7.operations.DecryptorBase(box)

Bases: abc.ABC

Decryptor base class.

abstract decrypt(data: Union[bytes, bytearray]) → bytes

Decrypt a piece of data

Parameters

data (Union[bytes, bytearray]) – To decrypt

Returns (bytes):

Decrypted result

class angelos.archive7.operations.EncryptorBase(box)

Bases: abc.ABC

Encryptor base class.

abstract encrypt(data: Union[bytes, bytearray]) → bytes

Encrypt a piece of data

Parameters

data (Union[bytes, bytearray]) – To encrypt

Returns (bytes):

Encrypted result

class angelos.archive7.operations.InvalidMetaFilter

Bases: angelos.archive7.operations.DataFilter

Filter each block and test meta information.

NAME = 'invalid_meta'
analyze(block: importlib._bootstrap.BlockTuple, pos: int)

Analyze blocks for data corruption

class angelos.archive7.operations.ReEncryptOperation

Bases: angelos.archive7.operations.StreamOperation

Re-encrypts an archive with a new key.

class angelos.archive7.operations.ShredOperation

Bases: angelos.archive7.operations.ReEncryptOperation

Generates a new key and re-encrypts, then throws the key away.

class angelos.archive7.operations.StreamIndexerFilter

Bases: angelos.archive7.operations.DataFilter

Filter for indexing all streams.

NAME = 'stream_indexer'
analyze(block: importlib._bootstrap.BlockTuple, pos: int)

Analyze blocks for data corruption

class angelos.archive7.operations.StreamIterator(fileobj: _io.FileIO, generator: Generator = None)

Bases: collections.abc.Iterator

Iterate over an Archive7 file.

property position

Current position

class angelos.archive7.operations.StreamOperation

Bases: abc.ABC

class angelos.archive7.operations.SyncDecryptor(secret: bytes)

Bases: angelos.archive7.operations.DecryptorBase

Synchronous NaCl decryption.

decrypt(data: Union[bytes, bytearray]) → bytes

Synchronously decrypt data

class angelos.archive7.operations.SyncEncryptor(secret: bytes)

Bases: angelos.archive7.operations.EncryptorBase

Synchronous NaCl encryption.

encrypt(data: Union[bytes, bytearray]) → bytes

Synchronously encrypt data

class angelos.archive7.operations.VacuumOperation

Bases: angelos.archive7.operations.StreamOperation

Vacuums an archive and removes the trash.

class angelos.archive7.operations.ZipOperation

Bases: angelos.archive7.operations.StreamOperation

Zip streams.