Skip to content

API Reference

This section documents the API of p2p_copy. Only the functions run_relay, send and receive are intended for users, others are mainly information for developers. For module structure, see Module Layout. Generated from source code docstrings.

 

p2p_copy_server

run_relay async

run_relay(host, port, use_tls=True, certfile=None, keyfile=None)

Run the WebSocket relay server for pairing and forwarding client connections.

This command starts a relay server that listens on the specified host/interface and port, optionally secured with TLS (recommended for production). The server pairs sender and receiver clients based on matching passphrase hashes, then forwards bidirectional data streams without storing content. It handles exactly one sender and one receiver per code hash, rejecting duplicates. Use for secure, firewall-friendly (port 443) P2P transfers.

Parameters:

Name Type Description Default
host str

Host to bind to.

required
port int

Port to bind to.

required
use_tls bool

Whether to use TLS. Default is True.

True
certfile str

Path to TLS certificate file.

None
keyfile str

Path to TLS key file.

None

Raises:

Type Description
RuntimeError

If TLS is requested but certfile or keyfile is missing.

Source code in src/p2p_copy_server/relay.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def run_relay(host: str, port: int,
                    use_tls: bool = True,
                    certfile: Optional[str] = None,
                    keyfile: Optional[str] = None) -> None:
    """
    Run the WebSocket relay server for pairing and forwarding client connections.

    This command starts a relay server that listens on the specified host/interface and port,
    optionally secured with TLS (recommended for production). The server pairs
    sender and receiver clients based on matching passphrase hashes, then forwards
    bidirectional data streams without storing content. It handles exactly one
    sender and one receiver per code hash, rejecting duplicates. Use for secure,
    firewall-friendly (port 443) P2P transfers.

    Parameters
    ----------
    host : str
        Host to bind to.
    port : int
        Port to bind to.
    use_tls : bool, optional
        Whether to use TLS. Default is True.
    certfile : str, optional
        Path to TLS certificate file.
    keyfile : str, optional
        Path to TLS key file.

    Raises
    ------
    RuntimeError
        If TLS is requested but certfile or keyfile is missing.
    """
    ssl_ctx = None
    if use_tls:
        if not certfile or not keyfile:
            raise RuntimeError("TLS requested but certfile/keyfile missing")
        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ssl_ctx.load_cert_chain(certfile, keyfile)

    scheme = "wss" if ssl_ctx else "ws"
    print(f"\nRelay listening on {scheme}://{host}:{port}")

    if host != "localhost":
        use_production_logger()

    async with serve(_handle, host, port, max_size=2**21, ssl=ssl_ctx, compression=None):
        await asyncio.Future()  # run forever

 

p2p_copy.api

send async

send(server, code, files, *, encrypt=False, compress=CompressMode.auto, resume=False)

Send one or more files or directories to a paired receiver via the relay server.

This command connects to the specified WebSocket relay server, authenticates using the shared passphrase (hashed for pairing), and streams the provided files/directories in chunks to the receiver. Supports directories by recursively including all files in alphabetical order. Optional end-to-end encryption (AES-GCM) and compression (Zstandard, auto-detected per file) can be enabled. If resume is enabled, it coordinates with the receiver to skip complete files or append to partial ones based on chained checksum verification.

Parameters:

Name Type Description Default
server str

The WebSocket server URL (ws:// or wss://).

required
code str

The shared passphrase/code for pairing.

required
files List[str]

List of files and/or directories to send.

required
encrypt bool

Enable end-to-end encryption. Default is False. Receiver needs to use the same setting.

False
compress CompressMode

Compression mode. Default is 'auto'.

auto
resume bool

Enable resume of partial transfers. Default is False. If True, attempt to skip identical files and append incomplete files based on receiver feedback.

False

Returns:

Type Description
int

Exit code: 0 on success, non-zero on error.

Notes
  • Supports resuming by comparing checksums of partial files.
  • Uses chunked streaming for large files.
Source code in src/p2p_copy/api.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def send(server: str, code: str, files: List[str],
               *, encrypt: bool = False,
               compress: CompressMode = CompressMode.auto,
               resume: bool = False) -> int:
    """
    Send one or more files or directories to a paired receiver via the relay server.

    This command connects to the specified WebSocket relay server, authenticates using
    the shared passphrase (hashed for pairing), and streams the provided files/directories
    in chunks to the receiver. Supports directories by recursively including all files
    in alphabetical order. Optional end-to-end encryption (AES-GCM) and compression
    (Zstandard, auto-detected per file) can be enabled. If resume is enabled, it
    coordinates with the receiver to skip complete files or append to partial ones
    based on chained checksum verification.

    Parameters
    ----------
    server : str
        The WebSocket server URL (ws:// or wss://).
    code : str
        The shared passphrase/code for pairing.
    files : List[str]
        List of files and/or directories to send.
    encrypt : bool, optional
        Enable end-to-end encryption. Default is False.
        Receiver needs to use the same setting.
    compress : CompressMode, optional
        Compression mode. Default is 'auto'.
    resume : bool, optional
        Enable resume of partial transfers. Default is False.
        If True, attempt to skip identical files and append
        incomplete files based on receiver feedback.

    Returns
    -------
    int
        Exit code: 0 on success, non-zero on error.

    Notes
    -----
    - Supports resuming by comparing checksums of partial files.
    - Uses chunked streaming for large files.
    """

    # Closures to break up functions for readability

    async def wait_for_receiver_ready():
        try:
            ready_frame = await asyncio.wait_for(ws.recv(), timeout=300)  # 300s Timeout
            if isinstance(ready_frame, str):
                ready = loads(ready_frame)
                if ready.get("type") != "ready":
                    print("[p2p_copy] send(): unexpected frame after hello")
                    return 3
            else:
                print("[p2p_copy] send(): expected text frame after hello")
                return 3
        except asyncio.TimeoutError:
            print("[p2p_copy] send(): timeout waiting for ready")
            return 3

    async def wait_for_receiver_resume_manifest():
        try:
            raw = await asyncio.wait_for(ws.recv(), timeout=30)
        except asyncio.TimeoutError:
            print("[p2p_copy] send(): timeout waiting for receiver_manifest")
            return 3
        if isinstance(raw, str):
            o = loads(raw)
            t = o.get("type")
            if t == "enc_receiver_manifest" and encrypt:
                try:
                    hidden = bytes.fromhex(o["hidden_manifest"])
                    m_str = secure.decrypt_chunk(hidden).decode()
                    o = loads(m_str)
                    t = o.get("type")
                except Exception:
                    print("[p2p_copy] send():  failed to decrypt encrypted receiver manifest")
                    return 3

            if t == "receiver_manifest":
                for e in o.get("entries", []):
                    try:
                        p = e["path"]
                        sz = int(e["size"])
                        ch = bytes.fromhex(e["chain_hex"])
                        resume_map[p] = (sz, ch)
                    except Exception:
                        print("[p2p_copy] send():  failed to read receiver manifest")
                        return 3

    async def pairing_with_receiver():
        await ws.send(hello)
        if receiver_not_ready := await wait_for_receiver_ready():
            return receiver_not_ready

        # Send file infos to receiver
        await ws.send(manifest)

        # wait for receiver resume manifest (optionally encrypted)
        if resume and (no_response_manifest := await wait_for_receiver_resume_manifest()):
            return no_response_manifest

    async def determine_file_resume_point():
        hint = resume_map.get(rel_p.as_posix())
        if hint is not None:
            recv_size, recv_chain = hint
            if 0 < recv_size <= size:
                hashed, local_chain = await compute_chain_up_to(abs_p, limit=recv_size)
                if hashed == recv_size and local_chain == recv_chain:
                    return recv_size
                else:
                    # mismatch -> overwrite from scratch
                    return 0
        return 0

    async def send_file():
        append_from = 0
        # Determine resume point (optional)
        if resume and (append_from := await determine_file_resume_point()) == size:
            return  # Receiver already has identical file -> skip

        # Open file and optionally seek resume point
        with abs_p.open("rb") as fp:
            if append_from:
                await asyncio.to_thread(fp.seek, append_from, 0)

            # Initialize per-transfer chain and sequence
            chained_checksum = ChainedChecksum()
            seq = 0

            # Determine whether to use compression by compressing the first chunk
            chunk = await asyncio.to_thread(fp.read, CHUNK_SIZE)
            chunk = await Compressor.determine_compression(compressor, chunk)

            # Build the complete file info header
            file_info = file_begin(rel_p.as_posix(), size, compressor.compression_type, append_from=append_from)

            # Optionally encrypt the file info
            if encrypt:
                enc_file_info = secure.encrypt_chunk(file_info.encode())
                file_info = encrypted_file_begin(enc_file_info)

            # Send file info header
            await ws.send(file_info)

            # Prepare the first frame, first chunk is optionally compressed and then encrypted
            frame: bytes = pack_chunk(seq, chained_checksum.next_hash(chunk), secure.encrypt_chunk(chunk))
            seq += 1

            def next_frame():
                """prepares the next frame of a file to send, optionally compresses and encrypts"""
                compressed_chunk = compressor.compress(chunk)
                enc_chunk = secure.encrypt_chunk(compressed_chunk)
                return pack_chunk(seq, chained_checksum.next_hash(compressed_chunk), enc_chunk)

            # Send remaining chunks
            async for chunk in read_in_chunks(fp):
                # Next frame gets prepared in a parallel thread
                next_frame_coro = asyncio.to_thread(next_frame)
                # Send the current frame while next frame gets prepared
                await ws.send(frame)
                # Complete the next frame
                frame: bytes = await next_frame_coro
                seq += 1

        # Send the last frame
        await ws.send(frame)
        await ws.send(FILE_EOF)

    # End of Closures

    # Build manifest entries from given file list
    resolved_file_list: List[Tuple[Path, Path, int]] = list(iter_manifest_entries(files))
    if not resolved_file_list:
        print("[p2p_copy] send(): no legal files where passed")
        return 3

    entries: List[ManifestEntry] = [ManifestEntry(path=rel.as_posix(), size=size) for (_, rel, size) in
                                    resolved_file_list]

    # Initialize security-handler, compressor
    secure = SecurityHandler(code, encrypt)
    compressor = Compressor(mode=compress)

    hello = Hello(type="hello", code_hash_hex=secure.code_hash.hex(), role="sender").to_json()
    manifest = Manifest(type="manifest", resume=resume, entries=entries).to_json()
    if encrypt:  # Optionally encrypt the manifest
        manifest = secure.build_encrypted_manifest(manifest)

    # Connect to relay (disable WebSocket internal compression)
    async with connect(server, max_size=2**21, compression=None) as ws:
        # Stores info returned by the sender about what files are already present
        resume_map: Dict[str, Tuple[int, bytes]] = {}
        # Attempt to connect and optionally exchange info with receiver
        if pairing_failed := await pairing_with_receiver():
            return pairing_failed

        # Transfer each file
        for abs_p, rel_p, size in resolved_file_list:
            await send_file()

        # All done, send message to confirm the end of the copying process
        await ws.send(EOF)
        # Return non-error code
        return 0

receive async

receive(server, code, *, encrypt=False, out=None)

Receive files from a paired sender via the relay server and write to the output directory.

This command connects to the relay server, pairs using the shared passphrase hash, and receives a manifest of incoming files/directories. Files are written to the output directory, preserving relative paths from the manifest. Supports optional end-to-end decryption (matching sender's encryption) and decompression. If the sender requests resume, this receiver reports existing file states (via checksums) to enable skipping or appending.

Parameters:

Name Type Description Default
server str

The WebSocket server URL (ws:// or wss://).

required
code str

The shared passphrase/code for pairing.

required
encrypt bool

Enable end-to-end encryption. Default is False. Sender needs to use the same setting.

False
out str

Output directory. Default is current directory.

None

Returns:

Type Description
int

Exit code: 0 on success, non-zero on error.

Notes
  • Supports resume if sender requests it.
  • Writes files to the output directory, preserving relative paths.
  • Info on whether to resume and compress is received from the sender
Source code in src/p2p_copy/api.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
async def receive(server: str, code: str,
                  *, encrypt: bool = False,
                  out: Optional[str] = None) -> int:
    """
    Receive files from a paired sender via the relay server and write to the output directory.

    This command connects to the relay server, pairs using the shared passphrase hash,
    and receives a manifest of incoming files/directories. Files are written to the
    output directory, preserving relative paths from the manifest. Supports optional
    end-to-end decryption (matching sender's encryption) and decompression. If the
    sender requests resume, this receiver reports existing file states (via checksums)
    to enable skipping or appending.

    Parameters
    ----------
    server : str
        The WebSocket server URL (ws:// or wss://).
    code : str
        The shared passphrase/code for pairing.
    encrypt : bool, optional
        Enable end-to-end encryption. Default is False.
        Sender needs to use the same setting.
    out : str, optional
        Output directory. Default is current directory.

    Returns
    -------
    int
        Exit code: 0 on success, non-zero on error.

    Notes
    -----
    - Supports resume if sender requests it.
    - Writes files to the output directory, preserving relative paths.
    - Info on whether to resume and compress is received from the sender
    """

    # Closures to break up functions for readability

    def return_with_error_code(msg: str = ""):
        if cur_fp is not None:
            cur_fp.close()
        if msg:
            print(f"[p2p_copy] receive(): {msg}")
        return 4

    async def handle_enc_manifest(o: dict):
        try:
            nonce_hex = o.get("nonce")
            secure.nonce_hasher.next_hash(bytes.fromhex(nonce_hex))
            hidden = bytes.fromhex(o["hidden_manifest"])
            manifest_str = secure.decrypt_chunk(hidden).decode()
            o = loads(manifest_str)
            await handle_manifest(o)  # Delegate to plain handler
        except Exception as e:
            raise ValueError(f"Failed to decrypt manifest: {e}")

    async def handle_manifest(o: dict):
        resume = o.get("resume", False)
        if resume:
            entries = o.get("entries", [])
            reply_entries: List[ReceiverManifestEntry] = []

            for e in entries:
                try:
                    rel = Path(e["path"])
                    local_path = (out_dir / rel).resolve()
                    if local_path.is_file():
                        local_size = local_path.stat().st_size
                        if local_size > 0:
                            hashed, chain_b = await compute_chain_up_to(local_path)
                            resume_known[rel.as_posix()] = (hashed, chain_b)
                            reply_entries.append(
                                ReceiverManifestEntry(
                                    path=rel.as_posix(),
                                    size=hashed,
                                    chain_hex=chain_b.hex(),
                                )
                            )
                except Exception:
                    continue  # Skip bad entries

            if encrypt:
                clear = ReceiverManifest(type="receiver_manifest", entries=reply_entries).to_json().encode()
                hidden = secure.encrypt_chunk(clear)
                reply = EncryptedReceiverManifest(
                    type="enc_receiver_manifest",
                    hidden_manifest=hidden.hex()
                ).to_json()
                await ws.send(reply)
            else:
                await ws.send(ReceiverManifest(type="receiver_manifest", entries=reply_entries).to_json())

    async def handle_enc_file(o: dict):
        try:
            hidden = bytes.fromhex(o["hidden_file"])
            file_str = secure.decrypt_chunk(hidden).decode()
            o = loads(file_str)
            await handle_file(o)
        except Exception as e:
            raise ValueError(f"Failed to decrypt file info: {e}")

    async def handle_file(o: dict):
        nonlocal cur_fp, cur_expected_size, cur_seq_expected, bytes_written, compressor, chained_checksum
        if cur_fp is not None:
            raise ValueError("Got new file while previous still open")
        try:
            rel_path = o["path"]
            total_size: int = o.get("size")
            compression = o.get("compression", "none")
            append_from: int = o.get("append_from", 0)
        except Exception:
            raise ValueError(f"Bad file header: {o}")

        dest = (out_dir / Path(rel_path)).resolve()
        ensure_dir(dest.parent)

        open_mode = "wb"
        expected_remaining = total_size
        if append_from > 0 and dest.exists() and dest.is_file():
            local_size = dest.stat().st_size
            if 0 <= append_from <= total_size and local_size == append_from:
                open_mode = "ab"
                expected_remaining = total_size - append_from
            else:
                expected_remaining = total_size

        cur_fp = dest.open(open_mode)
        cur_expected_size = expected_remaining
        cur_seq_expected = 0
        bytes_written = 0
        compressor.set_decompression(compression)
        chained_checksum = ChainedChecksum()

    async def handle_file_eof(o: dict):
        nonlocal cur_fp
        if cur_fp is None:
            raise ValueError("Got file_eof without open file")
        if cur_expected_size is not None and bytes_written != cur_expected_size:
            raise ValueError(f"Size mismatch: {bytes_written} != {cur_expected_size}")
        cur_fp.close()
        cur_fp = None

    async def handle_chunk():
        nonlocal bytes_written, cur_seq_expected
        if cur_fp is None:
            raise ValueError("Unexpected binary data without open file")
        seq, chain, payload = unpack_chunk(frame)
        if seq != cur_seq_expected:
            raise ValueError(f"Sequence mismatch: {seq} != {cur_seq_expected}")

        raw_payload = secure.decrypt_chunk(payload) if encrypt else payload
        if chained_checksum.next_hash(raw_payload) != chain:
            raise ValueError("Chained checksum mismatch")

        chunk = compressor.decompress(raw_payload)
        await asyncio.to_thread(cur_fp.write, chunk)

        bytes_written += len(chunk)
        cur_seq_expected += 1

    async def handle_eof(o: dict):
        raise StopAsyncIteration  # Break the loop cleanly

    # Frame type dispatcher
    async def dispatch_frame():
        if isinstance(frame, (bytes, bytearray)):
            await handle_chunk()

        elif not isinstance(frame, str):
            raise ValueError("Unknown frame type")

        else:
            o = loads(frame)
            t = o.get("type")

            handlers = {
                "enc_manifest": handle_enc_manifest if encrypt else None,
                "manifest": handle_manifest if not encrypt else None,
                "enc_file": handle_enc_file if encrypt else None,
                "file": handle_file if not encrypt else None,
                "file_eof": handle_file_eof,
                "eof": handle_eof,
            }
            handler = handlers.get(t)
            if handler is None:
                raise ValueError(f"Unexpected control: {o}")
            await handler(o)

    # End of Closures

    out_dir = Path(out or ".")
    ensure_dir(out_dir)

    secure = SecurityHandler(code, encrypt)
    hello = Hello(type="hello", code_hash_hex=secure.code_hash.hex(), role="receiver").to_json()

    # Receiver state
    cur_fp: Optional[BinaryIO] = None
    cur_expected_size: Optional[int] = None
    cur_seq_expected = 0
    bytes_written = 0
    chained_checksum = ChainedChecksum()
    compressor = Compressor()
    resume_known: Dict[str, Tuple[int, bytes]] = {}

    async with connect(server, max_size=2**21, compression=None) as ws:
        await ws.send(hello)
        try:
            async for frame in ws:
                await dispatch_frame()
        except StopAsyncIteration:
            pass  # Normal EOF
        except ValueError as e:
            return return_with_error_code(str(e))

    if cur_fp is not None:
        return return_with_error_code("Stream ended while file open")
    return 0

 

p2p_copy.security

SecurityHandler

Handle security operations like hashing, encryption, and decryption for transfers.

Parameters:

Name Type Description Default
code str

The shared passphrase/code.

required
encrypt bool

Whether to enable end-to-end encryption.

required
Source code in src/p2p_copy/security.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class SecurityHandler:
    """
    Handle security operations like hashing, encryption, and decryption for transfers.

    Parameters
    ----------
    code : str
        The shared passphrase/code.
    encrypt : bool
        Whether to enable end-to-end encryption.
    """

    def __init__(self, code: str, encrypt: bool):
        self.encrypt = encrypt
        if self.encrypt:
            import_optional_security_libs()
            self.code_hash = _get_argon2_hash(code, b"code_hash used for hello-match")
            self.nonce_hasher = ChainedChecksum()
            self.cipher = AESGCM(_get_argon2_hash(code, b"cipher used for E2E-encryption"))
        else:
            self.code_hash = hashlib.sha256(code.encode()).digest()

    def encrypt_chunk(self, chunk: bytes) -> bytes:
        """
        Encrypt a chunk if encryption is enabled.

        Parameters
        ----------
        chunk : bytes
            The chunk to encrypt.

        Returns
        -------
        bytes
            The encrypted chunk, or original if not encrypted.
        """
        if self.encrypt:
            return self.cipher.encrypt(self.nonce_hasher.next_hash(), chunk, None)
        return chunk

    def decrypt_chunk(self, chunk: bytes) -> bytes:
        """
        Decrypt a chunk if encryption is enabled.

        Parameters
        ----------
        chunk : bytes
            The chunk to decrypt.

        Returns
        -------
        bytes
            The decrypted chunk, or original if not encrypted.
        """
        if self.encrypt:
            return self.cipher.decrypt(self.nonce_hasher.next_hash(), chunk, None)
        return chunk

    def build_encrypted_manifest(self, manifest: str) -> str:
        """
        Build an encrypted manifest for secure transmission.

        Parameters
        ----------
        manifest : str
            The plaintext manifest JSON.

        Returns
        -------
        str
            The JSON-serialized EncryptedManifest.
        """
        start_nonce = os.urandom(32)
        self.nonce_hasher.next_hash(start_nonce)
        enc_manifest = self.encrypt_chunk(manifest.encode())
        return EncryptedManifest(
            type="enc_manifest",
            nonce=start_nonce.hex(),
            hidden_manifest=enc_manifest.hex()
        ).to_json()

encrypt_chunk

encrypt_chunk(chunk)

Encrypt a chunk if encryption is enabled.

Parameters:

Name Type Description Default
chunk bytes

The chunk to encrypt.

required

Returns:

Type Description
bytes

The encrypted chunk, or original if not encrypted.

Source code in src/p2p_copy/security.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def encrypt_chunk(self, chunk: bytes) -> bytes:
    """
    Encrypt a chunk if encryption is enabled.

    Parameters
    ----------
    chunk : bytes
        The chunk to encrypt.

    Returns
    -------
    bytes
        The encrypted chunk, or original if not encrypted.
    """
    if self.encrypt:
        return self.cipher.encrypt(self.nonce_hasher.next_hash(), chunk, None)
    return chunk

decrypt_chunk

decrypt_chunk(chunk)

Decrypt a chunk if encryption is enabled.

Parameters:

Name Type Description Default
chunk bytes

The chunk to decrypt.

required

Returns:

Type Description
bytes

The decrypted chunk, or original if not encrypted.

Source code in src/p2p_copy/security.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def decrypt_chunk(self, chunk: bytes) -> bytes:
    """
    Decrypt a chunk if encryption is enabled.

    Parameters
    ----------
    chunk : bytes
        The chunk to decrypt.

    Returns
    -------
    bytes
        The decrypted chunk, or original if not encrypted.
    """
    if self.encrypt:
        return self.cipher.decrypt(self.nonce_hasher.next_hash(), chunk, None)
    return chunk

build_encrypted_manifest

build_encrypted_manifest(manifest)

Build an encrypted manifest for secure transmission.

Parameters:

Name Type Description Default
manifest str

The plaintext manifest JSON.

required

Returns:

Type Description
str

The JSON-serialized EncryptedManifest.

Source code in src/p2p_copy/security.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def build_encrypted_manifest(self, manifest: str) -> str:
    """
    Build an encrypted manifest for secure transmission.

    Parameters
    ----------
    manifest : str
        The plaintext manifest JSON.

    Returns
    -------
    str
        The JSON-serialized EncryptedManifest.
    """
    start_nonce = os.urandom(32)
    self.nonce_hasher.next_hash(start_nonce)
    enc_manifest = self.encrypt_chunk(manifest.encode())
    return EncryptedManifest(
        type="enc_manifest",
        nonce=start_nonce.hex(),
        hidden_manifest=enc_manifest.hex()
    ).to_json()

ChainedChecksum

Generate chained SHA-256 checksums over sequential payloads.

Parameters:

Name Type Description Default
seed bytes

Initial seed for the chain. Default is empty bytes.

b''
Source code in src/p2p_copy/security.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class ChainedChecksum:
    """
    Generate chained SHA-256 checksums over sequential payloads.

    Parameters
    ----------
    seed : bytes, optional
        Initial seed for the chain. Default is empty bytes.
    """

    def __init__(self, seed: bytes = b"") -> None:
        self.prev_chain = seed

    def next_hash(self, payload: bytes = b"") -> bytes:
        """
        Compute the next hash in the chain: sha256(prev_chain || payload).

        Parameters
        ----------
        payload : bytes, optional
            Data to include in this hash. Default is empty.

        Returns
        -------
        bytes
            The 32-byte hash, which becomes the new prev_chain.
        """
        h = hashlib.sha256()
        h.update(self.prev_chain)
        h.update(payload)
        self.prev_chain = h.digest()
        return self.prev_chain

next_hash

next_hash(payload=b'')

Compute the next hash in the chain: sha256(prev_chain || payload).

Parameters:

Name Type Description Default
payload bytes

Data to include in this hash. Default is empty.

b''

Returns:

Type Description
bytes

The 32-byte hash, which becomes the new prev_chain.

Source code in src/p2p_copy/security.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def next_hash(self, payload: bytes = b"") -> bytes:
    """
    Compute the next hash in the chain: sha256(prev_chain || payload).

    Parameters
    ----------
    payload : bytes, optional
        Data to include in this hash. Default is empty.

    Returns
    -------
    bytes
        The 32-byte hash, which becomes the new prev_chain.
    """
    h = hashlib.sha256()
    h.update(self.prev_chain)
    h.update(payload)
    self.prev_chain = h.digest()
    return self.prev_chain

import_optional_security_libs

import_optional_security_libs()

Import optional security libraries (argon2-cffi, cryptography) if encryption is used.

Source code in src/p2p_copy/security.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
def import_optional_security_libs():
    """
    Import optional security libraries (argon2-cffi, cryptography) if encryption is used.
    """
    global hash_secret_raw, Type, AESGCM
    try:
        # security libs are needed if encryption is used
        from argon2.low_level import hash_secret_raw, Type
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    except ModuleNotFoundError as E:
        raise ModuleNotFoundError(
            E.msg + '\nTo use encryption optional security libs are needed (pip install p2p-copy[security])')

 

p2p_copy.compressor

CompressMode

Bases: str, Enum

Enumeration of compression modes.

Source code in src/p2p_copy/compressor.py
 7
 8
 9
10
11
12
13
14
class CompressMode(str, Enum):
    """
    Enumeration of compression modes.
    """

    auto = "auto"
    on = "on"
    off = "off"

Compressor

Handle compression and decompression of chunks using Zstandard.

Parameters:

Name Type Description Default
mode CompressMode

Compression mode. Default is 'auto'.

auto
Source code in src/p2p_copy/compressor.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class Compressor:
    """
    Handle compression and decompression of chunks using Zstandard.

    Parameters
    ----------
    mode : CompressMode, optional
        Compression mode. Default is 'auto'.
    """

    def __init__(self, mode: CompressMode = CompressMode.auto):
        self.mode = mode
        self.cctx: Optional[zstd.ZstdCompressor] = zstd.ZstdCompressor(level=3) if mode != CompressMode.off else None
        self.dctx: Optional[zstd.ZstdDecompressor] = None
        self.use_compression: bool = mode == CompressMode.on
        self.compression_type: str = "zstd" if mode == CompressMode.on else "none"

    async def determine_compression(self, first_chunk: bytes) -> bytes:
        """
        Determine if compression should be used based on the first chunk (auto mode).

        Parameters
        ----------
        first_chunk : bytes
            The first chunk of data.

        Returns
        -------
        bytes
            The (possibly compressed) first chunk.
        """

        if self.mode == CompressMode.off:
            return first_chunk

        else:
            compressed = self.cctx.compress(first_chunk)
            if self.mode == CompressMode.on:
                return compressed

            elif self.mode == CompressMode.auto:
                # Auto mode: test first chunk
                compression_ratio = len(compressed) / len(first_chunk) if first_chunk else 1.0
                self.use_compression = compression_ratio < 0.95  # Enable if compressed size < 95% of original
                self.compression_type = "zstd" if self.use_compression else "none"
                return compressed if self.use_compression else first_chunk

    def compress(self, chunk: bytes) -> bytes:
        """
        Compress a chunk if compression is enabled.

        Parameters
        ----------
        chunk : bytes
            The chunk to compress.

        Returns
        -------
        bytes
            The compressed or original chunk.
        """
        """Compress a chunk if compression is enabled."""
        if self.use_compression and self.cctx:
            return self.cctx.compress(chunk)
        return chunk

    def decompress(self, chunk: bytes) -> bytes:
        """
        Decompress a chunk if decompression is set up.

        Parameters
        ----------
        chunk : bytes
            The chunk to decompress.

        Returns
        -------
        bytes
            The decompressed or original chunk.
        """

        if self.dctx:
            return self.dctx.decompress(chunk)
        return chunk

    def set_decompression(self, compression_type: str):
        """
        Set up the decompressor based on the compression type.

        Parameters
        ----------
        compression_type : str
            The type of compression ('zstd' or 'none').
        """

        self.dctx = zstd.ZstdDecompressor() if compression_type == "zstd" else None

determine_compression async

determine_compression(first_chunk)

Determine if compression should be used based on the first chunk (auto mode).

Parameters:

Name Type Description Default
first_chunk bytes

The first chunk of data.

required

Returns:

Type Description
bytes

The (possibly compressed) first chunk.

Source code in src/p2p_copy/compressor.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def determine_compression(self, first_chunk: bytes) -> bytes:
    """
    Determine if compression should be used based on the first chunk (auto mode).

    Parameters
    ----------
    first_chunk : bytes
        The first chunk of data.

    Returns
    -------
    bytes
        The (possibly compressed) first chunk.
    """

    if self.mode == CompressMode.off:
        return first_chunk

    else:
        compressed = self.cctx.compress(first_chunk)
        if self.mode == CompressMode.on:
            return compressed

        elif self.mode == CompressMode.auto:
            # Auto mode: test first chunk
            compression_ratio = len(compressed) / len(first_chunk) if first_chunk else 1.0
            self.use_compression = compression_ratio < 0.95  # Enable if compressed size < 95% of original
            self.compression_type = "zstd" if self.use_compression else "none"
            return compressed if self.use_compression else first_chunk

compress

compress(chunk)

Compress a chunk if compression is enabled.

Parameters:

Name Type Description Default
chunk bytes

The chunk to compress.

required

Returns:

Type Description
bytes

The compressed or original chunk.

Source code in src/p2p_copy/compressor.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def compress(self, chunk: bytes) -> bytes:
    """
    Compress a chunk if compression is enabled.

    Parameters
    ----------
    chunk : bytes
        The chunk to compress.

    Returns
    -------
    bytes
        The compressed or original chunk.
    """
    """Compress a chunk if compression is enabled."""
    if self.use_compression and self.cctx:
        return self.cctx.compress(chunk)
    return chunk

decompress

decompress(chunk)

Decompress a chunk if decompression is set up.

Parameters:

Name Type Description Default
chunk bytes

The chunk to decompress.

required

Returns:

Type Description
bytes

The decompressed or original chunk.

Source code in src/p2p_copy/compressor.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def decompress(self, chunk: bytes) -> bytes:
    """
    Decompress a chunk if decompression is set up.

    Parameters
    ----------
    chunk : bytes
        The chunk to decompress.

    Returns
    -------
    bytes
        The decompressed or original chunk.
    """

    if self.dctx:
        return self.dctx.decompress(chunk)
    return chunk

set_decompression

set_decompression(compression_type)

Set up the decompressor based on the compression type.

Parameters:

Name Type Description Default
compression_type str

The type of compression ('zstd' or 'none').

required
Source code in src/p2p_copy/compressor.py
102
103
104
105
106
107
108
109
110
111
112
def set_decompression(self, compression_type: str):
    """
    Set up the decompressor based on the compression type.

    Parameters
    ----------
    compression_type : str
        The type of compression ('zstd' or 'none').
    """

    self.dctx = zstd.ZstdDecompressor() if compression_type == "zstd" else None

 

p2p_copy.protocol

Hello dataclass

Hello message for connection initiation.

Parameters:

Name Type Description Default
type Literal['hello']

Message type.

required
code_hash_hex str

Hex-encoded hash of the shared code.

required
role Literal['sender', 'receiver']

The role of this client.

required
Source code in src/p2p_copy/protocol.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass(frozen=True)
class Hello:
    """
    Hello message for connection initiation.

    Parameters
    ----------
    type : Literal["hello"]
        Message type.
    code_hash_hex : str
        Hex-encoded hash of the shared code.
    role : Literal["sender", "receiver"]
        The role of this client.
    """
    type: Literal["hello"]
    code_hash_hex: str
    role: Literal["sender", "receiver"]

    def to_json(self) -> str:
        return dumps({"type": "hello", "code_hash_hex": self.code_hash_hex, "role": self.role})

ManifestEntry dataclass

Entry in a file manifest.

Parameters:

Name Type Description Default
path str

Relative path of the file.

required
size int

File size in bytes.

required
Source code in src/p2p_copy/protocol.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass(frozen=True)
class ManifestEntry:
    """
    Entry in a file manifest.

    Parameters
    ----------
    path : str
        Relative path of the file.
    size : int
        File size in bytes.
    """
    path: str
    size: int

Manifest dataclass

Manifest of files to send.

Parameters:

Name Type Description Default
type Literal['manifest']

Message type.

required
entries Sequence[ManifestEntry]

List of file entries.

required
resume bool

Whether to enable resume. Default is False.

False
Source code in src/p2p_copy/protocol.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@dataclass(frozen=True)
class Manifest:
    """
    Manifest of files to send.

    Parameters
    ----------
    type : Literal["manifest"]
        Message type.
    entries : Sequence[ManifestEntry]
        List of file entries.
    resume : bool, optional
        Whether to enable resume. Default is False.
    """
    type: Literal["manifest"]
    entries: Sequence[ManifestEntry]
    resume: bool = False

    def to_json(self) -> str:
        return dumps({
            "type": "manifest",
            "resume": self.resume,
            "entries": [asdict(e) for e in self.entries]
        })

EncryptedManifest dataclass

Encrypted manifest for secure transmission.

Parameters:

Name Type Description Default
type Literal['enc_manifest']

Message type.

required
nonce str

Hex-encoded nonce that is used as random seed. Further nonces are based on this and used for encryption. Must be shared with receiver so it can decrypt accordingly.

required
hidden_manifest str

Hex-encoded encrypted manifest.

required
Source code in src/p2p_copy/protocol.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@dataclass(frozen=True)
class EncryptedManifest:
    """
    Encrypted manifest for secure transmission.

    Parameters
    ----------
    type : Literal["enc_manifest"]
        Message type.
    nonce : str
        Hex-encoded nonce that is used as random seed.
        Further nonces are based on this and used for encryption.
        Must be shared with receiver so it can decrypt accordingly.
    hidden_manifest : str
        Hex-encoded encrypted manifest.
    """
    type: Literal["enc_manifest"]
    nonce: str
    hidden_manifest: str

    def to_json(self) -> str:
        return dumps({
            "type": "enc_manifest",
            "nonce": self.nonce,
            "hidden_manifest": self.hidden_manifest
        })

ReceiverManifestEntry dataclass

Receiver's report of existing file state for resume.

Parameters:

Name Type Description Default
path str

Relative path.

required
size int

Bytes already present.

required
chain_hex str

Hex-encoded chained checksum up to 'size'.

required
Source code in src/p2p_copy/protocol.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@dataclass(frozen=True)
class ReceiverManifestEntry:
    """
    Receiver's report of existing file state for resume.

    Parameters
    ----------
    path : str
        Relative path.
    size : int
        Bytes already present.
    chain_hex : str
        Hex-encoded chained checksum up to 'size'.
    """
    path: str
    size: int
    chain_hex: str

ReceiverManifest dataclass

Manifest from receiver reporting existing files.

Parameters:

Name Type Description Default
type Literal['receiver_manifest']

Message type.

required
entries Sequence[ReceiverManifestEntry]

List of entries.

required
Source code in src/p2p_copy/protocol.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@dataclass(frozen=True)
class ReceiverManifest:
    """
    Manifest from receiver reporting existing files.

    Parameters
    ----------
    type : Literal["receiver_manifest"]
        Message type.
    entries : Sequence[ReceiverManifestEntry]
        List of entries.
    """
    type: Literal["receiver_manifest"]
    entries: Sequence[ReceiverManifestEntry]

    def to_json(self) -> str:
        return dumps({
            "type": "receiver_manifest",
            "entries": [asdict(e) for e in self.entries]
        })

EncryptedReceiverManifest dataclass

Encrypted receiver manifest.

Parameters:

Name Type Description Default
type Literal['enc_receiver_manifest']

Message type.

required
hidden_manifest str

Hex-encoded encrypted manifest.

required
Source code in src/p2p_copy/protocol.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@dataclass(frozen=True)
class EncryptedReceiverManifest:
    """
    Encrypted receiver manifest.

    Parameters
    ----------
    type : Literal["enc_receiver_manifest"]
        Message type.
    hidden_manifest : str
        Hex-encoded encrypted manifest.
    """
    type: Literal["enc_receiver_manifest"]
    hidden_manifest: str

    def to_json(self) -> str:
        return dumps({
            "type": "enc_receiver_manifest",
            "hidden_manifest": self.hidden_manifest
        })

dumps

dumps(msg)

JSON-dump a message with compact separators.

Parameters:

Name Type Description Default
msg Dict[str, Any]

The message to serialize.

required

Returns:

Type Description
str

Compact JSON string.

Source code in src/p2p_copy/protocol.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def dumps(msg: Dict[str, Any]) -> str:
    """
    JSON-dump a message with compact separators.

    Parameters
    ----------
    msg : Dict[str, Any]
        The message to serialize.

    Returns
    -------
    str
        Compact JSON string.
    """
    return json.dumps(msg, separators=(",", ":"), ensure_ascii=False)

loads

loads(s)

JSON-load a string into a dict.

Parameters:

Name Type Description Default
s str

JSON string.

required

Returns:

Type Description
Dict[str, Any]

Parsed dictionary.

Source code in src/p2p_copy/protocol.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def loads(s: str) -> Dict[str, Any]:
    """
    JSON-load a string into a dict.

    Parameters
    ----------
    s : str
        JSON string.

    Returns
    -------
    Dict[str, Any]
        Parsed dictionary.
    """
    return json.loads(s)

file_begin

file_begin(path, size, compression='none', append_from=0)

Create a file begin control message.

Parameters:

Name Type Description Default
path str

Relative path.

required
size int

Total file size.

required
compression str

Compression type. Default is 'none'.

'none'
append_from int

Byte offset to append from. Default is 0.

0

Returns:

Type Description
str

JSON string of the message.

Source code in src/p2p_copy/protocol.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def file_begin(path: str, size: int, compression: str = "none", append_from: int = 0) -> str:
    """
    Create a file begin control message.

    Parameters
    ----------
    path : str
        Relative path.
    size : int
        Total file size.
    compression : str, optional
        Compression type. Default is 'none'.
    append_from : int, optional
        Byte offset to append from. Default is 0.

    Returns
    -------
    str
        JSON string of the message.
    """
    """
    Start of a file stream. If append_from is given, it indicates the sender will
    only send bytes from [append_from .. size) and the receiver should open in 'ab'.
    """
    msg: Dict[str, Any] = {
        "type": "file",
        "path": path,
        "size": int(size),
        "compression": compression,
        "append_from": append_from
    }

    return dumps(msg)

encrypted_file_begin

encrypted_file_begin(hidden_file_info)

Wrap encrypted file info in a control message.

Parameters:

Name Type Description Default
hidden_file_info bytes

Encrypted file begin data.

required

Returns:

Type Description
str

JSON string of the enc_file message.

Source code in src/p2p_copy/protocol.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def encrypted_file_begin(hidden_file_info: bytes) -> str:
    """
    Wrap encrypted file info in a control message.

    Parameters
    ----------
    hidden_file_info : bytes
        Encrypted file begin data.

    Returns
    -------
    str
        JSON string of the enc_file message.
    """
    payload = {
        "type": "enc_file",
        "hidden_file": hidden_file_info.hex()
    }
    return dumps(payload)

pack_chunk

pack_chunk(seq, chain, payload)

Pack a chunk into a binary frame.

Parameters:

Name Type Description Default
seq int

Sequence number.

required
chain bytes

32-byte chain checksum.

required
payload bytes

The data payload.

required

Returns:

Type Description
bytes

Packed frame.

Source code in src/p2p_copy/protocol.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def pack_chunk(seq: int, chain: bytes, payload: bytes) -> bytes:
    """
    Pack a chunk into a binary frame.

    Parameters
    ----------
    seq : int
        Sequence number.
    chain : bytes
        32-byte chain checksum.
    payload : bytes
        The data payload.

    Returns
    -------
    bytes
        Packed frame.
    """
    return CHUNK_HEADER.pack(seq, chain) + payload

unpack_chunk

unpack_chunk(frame)

Unpack a binary chunk frame.

Parameters:

Name Type Description Default
frame bytes

The binary frame.

required

Returns:

Type Description
Tuple[int, bytes, bytes]

(seq, chain, payload)

Raises:

Type Description
ValueError

If frame is too short.

Source code in src/p2p_copy/protocol.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def unpack_chunk(frame: bytes) -> Tuple[int, bytes, bytes]:
    """
    Unpack a binary chunk frame.

    Parameters
    ----------
    frame : bytes
        The binary frame.

    Returns
    -------
    Tuple[int, bytes, bytes]
        (seq, chain, payload)

    Raises
    ------
    ValueError
        If frame is too short.
    """
    if len(frame) < CHUNK_HEADER.size:
        raise ValueError("short chunk frame")
    seq, chain = CHUNK_HEADER.unpack(frame[:CHUNK_HEADER.size])
    payload = frame[CHUNK_HEADER.size:]
    return seq, chain, payload

 

p2p_copy.io_utils

read_in_chunks async

read_in_chunks(fp, *, chunk_size=CHUNK_SIZE)

Asynchronously read bytes from a file in chunks.

Parameters:

Name Type Description Default
fp BinaryIO

The file pointer to read from.

required
chunk_size int

Size of each chunk in bytes. Default is 1 MiB.

CHUNK_SIZE

Yields:

Type Description
bytes

The next chunk of data.

Source code in src/p2p_copy/io_utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def read_in_chunks(fp: BinaryIO, *, chunk_size: int = CHUNK_SIZE) -> AsyncIterable[bytes]:
    """
    Asynchronously read bytes from a file in chunks.

    Parameters
    ----------
    fp : BinaryIO
        The file pointer to read from.
    chunk_size : int, optional
        Size of each chunk in bytes. Default is 1 MiB.

    Yields
    ------
    bytes
        The next chunk of data.
    """

    while True:
        # Read from disk without blocking the event-loop
        chunk = await asyncio.to_thread(fp.read, chunk_size)
        if not chunk:
            break
        yield chunk

compute_chain_up_to async

compute_chain_up_to(path, limit=None)

Compute chained checksum over the raw bytes of a file up to a limit.

Parameters:

Name Type Description Default
path Path

Path to the file.

required
limit int

Maximum bytes to hash. If None, hash the entire file.

None

Returns:

Type Description
tuple[int, bytes]

(bytes_hashed, final_chain_bytes)

Source code in src/p2p_copy/io_utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def compute_chain_up_to(path: Path, limit: int | None = None) -> Tuple[int, bytes]:
    """
    Compute chained checksum over the raw bytes of a file up to a limit.

    Parameters
    ----------
    path : Path
        Path to the file.
    limit : int, optional
        Maximum bytes to hash. If None, hash the entire file.

    Returns
    -------
    tuple[int, bytes]
        (bytes_hashed, final_chain_bytes)
    """

    c = ChainedChecksum()
    hashed = 0
    with path.open("rb") as fp:
        if limit is None:
            while True:
                chunk = await asyncio.to_thread(fp.read, CHUNK_SIZE)
                if not chunk:
                    break
                hashed += len(chunk)
                c.next_hash(chunk)
        else:
            remaining = int(limit)
            while remaining > 0:
                to_read = min(remaining, CHUNK_SIZE)
                chunk = await asyncio.to_thread(fp.read, to_read)
                if not chunk:
                    break
                hashed += len(chunk)
                remaining -= len(chunk)
                c.next_hash(chunk)
    return hashed, c.prev_chain

iter_manifest_entries

iter_manifest_entries(paths)

Yield manifest entries for files in the given paths (files or directories).

Parameters:

Name Type Description Default
paths List[str]

List of file or directory paths.

required

Yields:

Type Description
Tuple[Path, Path, int]

(absolute_path, relative_path, size)

Notes
  • Yields files in sorted order for directories.
  • Skips non-existent or invalid paths.
Source code in src/p2p_copy/io_utils.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def iter_manifest_entries(paths: List[str]) -> Iterator[Tuple[Path, Path, int]]:
    """
    Yield manifest entries for files in the given paths (files or directories).

    Parameters
    ----------
    paths : List[str]
        List of file or directory paths.

    Yields
    ------
    Tuple[Path, Path, int]
        (absolute_path, relative_path, size)

    Notes
    -----
    - Yields files in sorted order for directories.
    - Skips non-existent or invalid paths.
    """

    if not isinstance(paths, list):
        print("[p2p_copy] send(): files or dirs must be passed as list")
        return
    elif not paths:
        return

    for raw in paths:
        if len(raw) == 1:
            print("[p2p_copy] send(): probably not a file:", raw)
            continue
        p = Path(raw).expanduser()
        if not p.exists():
            print("[p2p_copy] send(): file does not exist:", p)
            continue
        if p.is_file():
            yield p.resolve(), Path(p.name), p.stat().st_size
        else:
            root = p.resolve()
            for sub in sorted(root.rglob("*")):
                if sub.is_file():
                    rel = Path(p.name) / sub.relative_to(root)
                    yield sub.resolve(), rel, sub.stat().st_size

ensure_dir

ensure_dir(p)

Ensure the directory exists, creating parents if needed.

Parameters:

Name Type Description Default
p Path

The path to ensure is a directory.

required
Source code in src/p2p_copy/io_utils.py
121
122
123
124
125
126
127
128
129
130
def ensure_dir(p: Path) -> None:
    """
    Ensure the directory exists, creating parents if needed.

    Parameters
    ----------
    p : Path
        The path to ensure is a directory.
    """
    p.mkdir(parents=True, exist_ok=True)

 

For api usage examples, see APIExamples. For feature details, see Features.