py_hamt

 1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
 2from .hamt import HAMT, blake3_hashfn
 3from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
 4from .zarr_hamt_store import ZarrHAMTStore
 5
 6__all__ = [
 7    "blake3_hashfn",
 8    "HAMT",
 9    "ContentAddressedStore",
10    "InMemoryCAS",
11    "KuboCAS",
12    "ZarrHAMTStore",
13    "SimpleEncryptedZarrHAMTStore",
14]
15
16print("Running py-hamt from source!")
def blake3_hashfn(input_bytes: bytes) -> bytes:
52def blake3_hashfn(input_bytes: bytes) -> bytes:
53    """
54    This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size.
55
56    """
57    # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify
58    digest: bytes = b3.digest(input_bytes, size=32)
59    raw_bytes: bytes = b3.unwrap(digest)
60    return raw_bytes

This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.

class HAMT:
288class HAMT:
289    """
290    An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
291
292    Use this to store arbitrarily large key-value mappings in your CAS of choice.
293
294    For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
295
296    When in read-only mode, the HAMT is both async and thread safe.
297
298    #### A note about memory management, read+write and read-only modes
299    The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
300
301    Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`.
302
303    These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits.
304
305    #### IPFS HAMT Sample Code
306    ```python
307    kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
308    hamt = await HAMT.build(cas=kubo_cas)
309    await hamt.set("foo", "bar")
310    assert (await hamt.get("foo")) == "bar"
311    await hamt.make_read_only()
312    cid = hamt.root_node_id # our root node CID
313    print(cid)
314    ```
315    """
316
317    def __init__(
318        self,
319        cas: ContentAddressedStore,
320        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
321        root_node_id: IPLDKind | None = None,
322        read_only: bool = False,
323        max_bucket_size: int = 4,
324        values_are_bytes: bool = False,
325    ):
326        """
327        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
328        """
329
330        self.cas: ContentAddressedStore = cas
331        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
332
333        self.hash_fn: Callable[[bytes], bytes] = hash_fn
334        """
335        This is the hash function used to place a key-value within the HAMT.
336
337        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
338
339        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
340
341        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
342        """
343
344        self.lock: asyncio.Lock = asyncio.Lock()
345        """@private"""
346
347        self.values_are_bytes: bool = values_are_bytes
348        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
349
350        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
351        """
352
353        if max_bucket_size < 1:
354            raise ValueError("Bucket size maximum must be a positive integer")
355        self.max_bucket_size: int = max_bucket_size
356        """
357        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
358
359        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
360
361        This must be a positive integer with a minimum of 1.
362        """
363
364        self.root_node_id: IPLDKind = root_node_id
365        """
366        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
367
368        Read from this only when in read mode to get something valid!
369        """
370
371        self.read_only: bool = read_only
372        """Clients should NOT modify this.
373
374        This is here for checking whether the HAMT is in read only or read/write mode.
375
376        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
377        """
378        self.node_store: NodeStore
379        """@private"""
380        if read_only:
381            self.node_store = ReadCacheStore(self)
382        else:
383            self.node_store = InMemoryTreeStore(self)
384
385    @classmethod
386    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
387        """
388        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
389
390        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
391        """
392        hamt = cls(*args, **kwargs)
393        if hamt.root_node_id is None:
394            hamt.root_node_id = await hamt.node_store.save(None, Node())
395        return hamt
396
397    # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async
398    async def make_read_only(self) -> None:
399        """
400        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
401
402        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
403        """
404        async with self.lock:
405            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
406            await inmemory_tree.vacate()
407
408            self.read_only = True
409            self.node_store = ReadCacheStore(self)
410
411    async def enable_write(self) -> None:
412        """
413        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
414        """
415        async with self.lock:
416            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
417            self.read_only = False
418            self.node_store = InMemoryTreeStore(self)
419
420    async def cache_size(self) -> int:
421        """
422        Returns the memory used by some internal performance optimization tools in bytes.
423
424        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
425
426        Be warned that this may take a while to run for large HAMTs.
427
428        For more on memory management, see the `HAMT` class documentation.
429        """
430        if self.read_only:
431            return self.node_store.size()
432        async with self.lock:
433            return self.node_store.size()
434
435    async def cache_vacate(self) -> None:
436        """
437        Vacate and completely empty out the internal read/write cache.
438
439        Be warned that this may take a while if there have been a lot of write operations.
440
441        For more on memory management, see the `HAMT` class documentation.
442        """
443        if self.read_only:
444            await self.node_store.vacate()
445        else:
446            async with self.lock:
447                await self.node_store.vacate()
448
449    async def _reserialize_and_link(
450        self, node_stack: list[tuple[IPLDKind, Node]]
451    ) -> None:
452        """
453        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
454        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
455        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
456        If a node ends up being empty, then it is deleted entirely, unless it is the root node
457        Modifies in place
458        """
459        # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root
460        for stack_index in range(len(node_stack) - 1, -1, -1):
461            old_id, node = node_stack[stack_index]
462
463            # If this node is empty, and it's not the root node, then we can delete it entirely from the list
464            is_root: bool = stack_index == 0
465            if node.is_empty() and not is_root:
466                # Unlink from the rest of the tree
467                _, prev_node = node_stack[stack_index - 1]
468                # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation
469                for link_index in prev_node.iter_link_indices():
470                    link = prev_node.get_link(link_index)
471                    if link == old_id:
472                        # Delete the link by making it an empty bucket
473                        prev_node.data[link_index] = {}
474                        break
475
476                # Remove from our stack, continue reserializing up the tree
477                node_stack.pop(stack_index)
478                continue
479
480            # If not an empty node, just reserialize like normal and replace this one
481            new_store_id: IPLDKind = await self.node_store.save(old_id, node)
482            node_stack[stack_index] = (new_store_id, node)
483
484            # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized
485            if not is_root:
486                _, prev_node = node_stack[stack_index - 1]
487                prev_node.replace_link(old_id, new_store_id)
488
489    # automatically skip encoding if the value provided is of the bytes variety
490    async def set(self, key: str, val: IPLDKind) -> None:
491        """Write a key-value mapping."""
492        if self.read_only:
493            raise Exception("Cannot call set on a read only HAMT")
494
495        data: bytes
496        if self.values_are_bytes:
497            data = cast(
498                bytes, val
499            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
500        else:
501            data = dag_cbor.encode(val)
502
503        pointer: IPLDKind = await self.cas.save(data, codec="raw")
504        await self._set_pointer(key, pointer)
505
506    async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None:
507        async with self.lock:
508            node_stack: list[tuple[IPLDKind, Node]] = []
509            root_node: Node = await self.node_store.load(self.root_node_id)
510            node_stack.append((self.root_node_id, root_node))
511
512            # FIFO queue to keep track of all the KVs we need to insert
513            # This is needed if any buckets overflow and so we need to reinsert all those KVs
514            kvs_queue: list[tuple[str, IPLDKind]] = []
515            kvs_queue.append((key, val_ptr))
516
517            while len(kvs_queue) > 0:
518                _, top_node = node_stack[-1]
519                curr_key, curr_val_ptr = kvs_queue[0]
520
521                raw_hash: bytes = self.hash_fn(curr_key.encode())
522                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
523
524                item = top_node.data[map_key]
525                if isinstance(item, list):
526                    next_node_id: IPLDKind = item[0]
527                    next_node: Node = await self.node_store.load(next_node_id)
528                    node_stack.append((next_node_id, next_node))
529                elif isinstance(item, dict):
530                    bucket: dict[str, IPLDKind] = item
531
532                    # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue
533                    if curr_key in bucket or len(bucket) < self.max_bucket_size:
534                        bucket[curr_key] = curr_val_ptr
535                        kvs_queue.pop(0)
536                        continue
537
538                    # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion
539                    for k in bucket:
540                        v_ptr = bucket[k]
541                        kvs_queue.append((k, v_ptr))
542
543                    # Create a new link to a new node so that we can reflow these KVs into a new subtree
544                    new_node = Node()
545                    new_node_id: IPLDKind = await self.node_store.save(None, new_node)
546                    link: list[IPLDKind] = [new_node_id]
547                    top_node.data[map_key] = link
548
549            # Finally, reserialize and fix all links, deleting empty nodes as needed
550            await self._reserialize_and_link(node_stack)
551            self.root_node_id = node_stack[0][0]
552
553    async def delete(self, key: str) -> None:
554        """Delete a key-value mapping."""
555
556        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
557        if self.read_only:
558            raise Exception("Cannot call delete on a read only HAMT")
559
560        async with self.lock:
561            raw_hash: bytes = self.hash_fn(key.encode())
562
563            node_stack: list[tuple[IPLDKind, Node]] = []
564            root_node: Node = await self.node_store.load(self.root_node_id)
565            node_stack.append((self.root_node_id, root_node))
566
567            created_change: bool = False
568            while True:
569                _, top_node = node_stack[-1]
570                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
571
572                item = top_node.data[map_key]
573                if isinstance(item, dict):
574                    bucket = item
575                    if key in bucket:
576                        del bucket[key]
577                        created_change = True
578                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
579                    break
580                elif isinstance(item, list):
581                    link: IPLDKind = item[0]
582                    next_node: Node = await self.node_store.load(link)
583                    node_stack.append((link, next_node))
584
585            # Finally, reserialize and fix all links, deleting empty nodes as needed
586            if created_change:
587                await self._reserialize_and_link(node_stack)
588                self.root_node_id = node_stack[0][0]
589            else:
590                # If we didn't make a change, then this key must not exist within the HAMT
591                raise KeyError
592
593    async def get(self, key: str) -> IPLDKind:
594        """Get a value."""
595        pointer: IPLDKind = await self.get_pointer(key)
596        data: bytes = await self.cas.load(pointer)
597        if self.values_are_bytes:
598            return data
599        else:
600            return dag_cbor.decode(data)
601
602    async def get_pointer(self, key: str) -> IPLDKind:
603        """
604        Get a store ID that points to the value for this key.
605
606        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
607        """
608        # If read only, no need to acquire a lock
609        pointer: IPLDKind
610        if self.read_only:
611            pointer = await self._get_pointer(key)
612        else:
613            async with self.lock:
614                pointer = await self._get_pointer(key)
615
616        return pointer
617
618    # Callers MUST handle acquiring a lock
619    async def _get_pointer(self, key: str) -> IPLDKind:
620        raw_hash: bytes = self.hash_fn(key.encode())
621
622        current_id: IPLDKind = self.root_node_id
623        current_depth: int = 0
624
625        # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind
626        result_ptr: IPLDKind = None
627        found_a_result: bool = False
628        while True:
629            top_id: IPLDKind = current_id
630            top_node: Node = await self.node_store.load(top_id)
631            map_key: int = extract_bits(raw_hash, current_depth, 8)
632
633            # Check if this key is in one of the buckets
634            item = top_node.data[map_key]
635            if isinstance(item, dict):
636                bucket = item
637                if key in bucket:
638                    result_ptr = bucket[key]
639                    found_a_result = True
640                    break
641
642            if isinstance(item, list):
643                link: IPLDKind = item[0]
644                current_id = link
645                current_depth += 1
646                continue
647
648            # Nowhere left to go, stop walking down the tree
649            break
650
651        if not found_a_result:
652            raise KeyError
653
654        return result_ptr
655
656    # Callers MUST handle locking or not on their own
657    async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]:
658        node_id_stack: list[IPLDKind] = [self.root_node_id]
659        while len(node_id_stack) > 0:
660            top_id: IPLDKind = node_id_stack.pop()
661            node: Node = await self.node_store.load(top_id)
662            yield (top_id, node)
663            node_id_stack.extend(list(node.iter_links()))
664
665    async def keys(self) -> AsyncIterator[str]:
666        """
667        AsyncIterator returning all keys in the HAMT.
668
669        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
670
671        When the HAMT is in read only mode however, this can be run concurrently with get operations.
672        """
673        if self.read_only:
674            async for k in self._keys_no_locking():
675                yield k
676        else:
677            async with self.lock:
678                async for k in self._keys_no_locking():
679                    yield k
680
681    async def _keys_no_locking(self) -> AsyncIterator[str]:
682        async for _, node in self._iter_nodes():
683            for bucket in node.iter_buckets():
684                for key in bucket:
685                    yield key
686
687    async def len(self) -> int:
688        """
689        Return the number of key value mappings in this HAMT.
690
691        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
692        """
693        count: int = 0
694        async for _ in self.keys():
695            count += 1
696
697        return count

An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.

Use this to store arbitrarily large key-value mappings in your CAS of choice.

For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.

When in read-only mode, the HAMT is both async and thread safe.

A note about memory management, read+write and read-only modes

The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.

Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.

These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.

IPFS HAMT Sample Code

kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
HAMT( cas: ContentAddressedStore, hash_fn: Callable[[bytes], bytes] = <function blake3_hashfn>, root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]] = None, read_only: bool = False, max_bucket_size: int = 4, values_are_bytes: bool = False)
317    def __init__(
318        self,
319        cas: ContentAddressedStore,
320        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
321        root_node_id: IPLDKind | None = None,
322        read_only: bool = False,
323        max_bucket_size: int = 4,
324        values_are_bytes: bool = False,
325    ):
326        """
327        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
328        """
329
330        self.cas: ContentAddressedStore = cas
331        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
332
333        self.hash_fn: Callable[[bytes], bytes] = hash_fn
334        """
335        This is the hash function used to place a key-value within the HAMT.
336
337        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
338
339        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
340
341        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
342        """
343
344        self.lock: asyncio.Lock = asyncio.Lock()
345        """@private"""
346
347        self.values_are_bytes: bool = values_are_bytes
348        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
349
350        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
351        """
352
353        if max_bucket_size < 1:
354            raise ValueError("Bucket size maximum must be a positive integer")
355        self.max_bucket_size: int = max_bucket_size
356        """
357        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
358
359        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
360
361        This must be a positive integer with a minimum of 1.
362        """
363
364        self.root_node_id: IPLDKind = root_node_id
365        """
366        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
367
368        Read from this only when in read mode to get something valid!
369        """
370
371        self.read_only: bool = read_only
372        """Clients should NOT modify this.
373
374        This is here for checking whether the HAMT is in read only or read/write mode.
375
376        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
377        """
378        self.node_store: NodeStore
379        """@private"""
380        if read_only:
381            self.node_store = ReadCacheStore(self)
382        else:
383            self.node_store = InMemoryTreeStore(self)

Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.

The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.

hash_fn: Callable[[bytes], bytes]

This is the hash function used to place a key-value within the HAMT.

To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.

It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.

Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.

values_are_bytes: bool

Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.

This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.

max_bucket_size: int

This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.

This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.

This must be a positive integer with a minimum of 1.

root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]

This is type IPLDKind but the documentation generator pdoc mangles it a bit.

Read from this only when in read mode to get something valid!

read_only: bool

Clients should NOT modify this.

This is here for checking whether the HAMT is in read only or read/write mode.

The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.

@classmethod
async def build(cls, *args: Any, **kwargs: Any) -> HAMT:
385    @classmethod
386    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
387        """
388        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
389
390        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
391        """
392        hamt = cls(*args, **kwargs)
393        if hamt.root_node_id is None:
394            hamt.root_node_id = await hamt.node_store.save(None, Node())
395        return hamt

Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.

This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.

async def make_read_only(self) -> None:
398    async def make_read_only(self) -> None:
399        """
400        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
401
402        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
403        """
404        async with self.lock:
405            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
406            await inmemory_tree.vacate()
407
408            self.read_only = True
409            self.node_store = ReadCacheStore(self)

Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.

In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.

async def enable_write(self) -> None:
411    async def enable_write(self) -> None:
412        """
413        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
414        """
415        async with self.lock:
416            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
417            self.read_only = False
418            self.node_store = InMemoryTreeStore(self)

Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.

async def cache_size(self) -> int:
420    async def cache_size(self) -> int:
421        """
422        Returns the memory used by some internal performance optimization tools in bytes.
423
424        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
425
426        Be warned that this may take a while to run for large HAMTs.
427
428        For more on memory management, see the `HAMT` class documentation.
429        """
430        if self.read_only:
431            return self.node_store.size()
432        async with self.lock:
433            return self.node_store.size()

Returns the memory used by some internal performance optimization tools in bytes.

This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.

Be warned that this may take a while to run for large HAMTs.

For more on memory management, see the HAMT class documentation.

async def cache_vacate(self) -> None:
435    async def cache_vacate(self) -> None:
436        """
437        Vacate and completely empty out the internal read/write cache.
438
439        Be warned that this may take a while if there have been a lot of write operations.
440
441        For more on memory management, see the `HAMT` class documentation.
442        """
443        if self.read_only:
444            await self.node_store.vacate()
445        else:
446            async with self.lock:
447                await self.node_store.vacate()

Vacate and completely empty out the internal read/write cache.

Be warned that this may take a while if there have been a lot of write operations.

For more on memory management, see the HAMT class documentation.

async def set( self, key: str, val: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> None:
490    async def set(self, key: str, val: IPLDKind) -> None:
491        """Write a key-value mapping."""
492        if self.read_only:
493            raise Exception("Cannot call set on a read only HAMT")
494
495        data: bytes
496        if self.values_are_bytes:
497            data = cast(
498                bytes, val
499            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
500        else:
501            data = dag_cbor.encode(val)
502
503        pointer: IPLDKind = await self.cas.save(data, codec="raw")
504        await self._set_pointer(key, pointer)

Write a key-value mapping.

async def delete(self, key: str) -> None:
553    async def delete(self, key: str) -> None:
554        """Delete a key-value mapping."""
555
556        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
557        if self.read_only:
558            raise Exception("Cannot call delete on a read only HAMT")
559
560        async with self.lock:
561            raw_hash: bytes = self.hash_fn(key.encode())
562
563            node_stack: list[tuple[IPLDKind, Node]] = []
564            root_node: Node = await self.node_store.load(self.root_node_id)
565            node_stack.append((self.root_node_id, root_node))
566
567            created_change: bool = False
568            while True:
569                _, top_node = node_stack[-1]
570                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
571
572                item = top_node.data[map_key]
573                if isinstance(item, dict):
574                    bucket = item
575                    if key in bucket:
576                        del bucket[key]
577                        created_change = True
578                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
579                    break
580                elif isinstance(item, list):
581                    link: IPLDKind = item[0]
582                    next_node: Node = await self.node_store.load(link)
583                    node_stack.append((link, next_node))
584
585            # Finally, reserialize and fix all links, deleting empty nodes as needed
586            if created_change:
587                await self._reserialize_and_link(node_stack)
588                self.root_node_id = node_stack[0][0]
589            else:
590                # If we didn't make a change, then this key must not exist within the HAMT
591                raise KeyError

Delete a key-value mapping.

async def get( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
593    async def get(self, key: str) -> IPLDKind:
594        """Get a value."""
595        pointer: IPLDKind = await self.get_pointer(key)
596        data: bytes = await self.cas.load(pointer)
597        if self.values_are_bytes:
598            return data
599        else:
600            return dag_cbor.decode(data)

Get a value.

async def get_pointer( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
602    async def get_pointer(self, key: str) -> IPLDKind:
603        """
604        Get a store ID that points to the value for this key.
605
606        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
607        """
608        # If read only, no need to acquire a lock
609        pointer: IPLDKind
610        if self.read_only:
611            pointer = await self._get_pointer(key)
612        else:
613            async with self.lock:
614                pointer = await self._get_pointer(key)
615
616        return pointer

Get a store ID that points to the value for this key.

This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.

async def keys(self) -> AsyncIterator[str]:
665    async def keys(self) -> AsyncIterator[str]:
666        """
667        AsyncIterator returning all keys in the HAMT.
668
669        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
670
671        When the HAMT is in read only mode however, this can be run concurrently with get operations.
672        """
673        if self.read_only:
674            async for k in self._keys_no_locking():
675                yield k
676        else:
677            async with self.lock:
678                async for k in self._keys_no_locking():
679                    yield k

AsyncIterator returning all keys in the HAMT.

If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.

When the HAMT is in read only mode however, this can be run concurrently with get operations.

async def len(self) -> int:
687    async def len(self) -> int:
688        """
689        Return the number of key value mappings in this HAMT.
690
691        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
692        """
693        count: int = 0
694        async for _ in self.keys():
695            count += 1
696
697        return count

Return the number of key value mappings in this HAMT.

When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.

class ContentAddressedStore(abc.ABC):
14class ContentAddressedStore(ABC):
15    """
16    Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data.
17
18    Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately.
19
20    #### A note on the IPLDKind return types
21    Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
22    1. No lists or dicts, since python does not classify these as immutable.
23    2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized.
24    """
25
26    CodecInput = Literal["raw", "dag-cbor"]
27
28    @abstractmethod
29    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
30        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
31
32        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
33        """
34
35    @abstractmethod
36    async def load(self, id: IPLDKind) -> bytes:
37        """Retrieve data."""

Abstract class that represents a content addressed storage that the HAMT can use for keeping data.

Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.

A note on the IPLDKind return types

Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:

  1. No lists or dicts, since python does not classify these as immutable.
  2. No None values since this is used in HAMT's __init__ to indicate that an empty HAMT needs to be initialized.
CodecInput = typing.Literal['raw', 'dag-cbor']
@abstractmethod
async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
28    @abstractmethod
29    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
30        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
31
32        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
33        """

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

@abstractmethod
async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
35    @abstractmethod
36    async def load(self, id: IPLDKind) -> bytes:
37        """Retrieve data."""

Retrieve data.

class InMemoryCAS(py_hamt.ContentAddressedStore):
40class InMemoryCAS(ContentAddressedStore):
41    """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in."""
42
43    store: dict[bytes, bytes]
44    hash_alg: Multihash
45
46    def __init__(self):
47        self.store = dict()
48        self.hash_alg = multihash.get("blake3")
49
50    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
51        hash: bytes = self.hash_alg.digest(data, size=32)
52        self.store[hash] = data
53        return hash
54
55    async def load(self, id: IPLDKind) -> bytes:
56        """
57        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
58        backend we *require* a `bytes` hash; anything else is rejected at run
59        time. In OO type-checking, a subclass may widen (make more general) argument types,
60        but it must never narrow them; otherwise callers that expect the base-class contract can break.
61        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
62        This is why we use `cast` here, to tell mypy that we know what we are doing.
63        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
64        """
65        key = cast(bytes, id)
66        if not isinstance(key, (bytes, bytearray)):  # defensive guard
67            raise TypeError(
68                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
69            )
70
71        try:
72            return self.store[key]
73        except KeyError as exc:
74            raise KeyError("Object not found in in-memory store") from exc

Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.

store: dict[bytes, bytes]
hash_alg: multiformats.multihash.Multihash
async def save(self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> bytes:
50    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
51        hash: bytes = self.hash_alg.digest(data, size=32)
52        self.store[hash] = data
53        return hash

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
55    async def load(self, id: IPLDKind) -> bytes:
56        """
57        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
58        backend we *require* a `bytes` hash; anything else is rejected at run
59        time. In OO type-checking, a subclass may widen (make more general) argument types,
60        but it must never narrow them; otherwise callers that expect the base-class contract can break.
61        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
62        This is why we use `cast` here, to tell mypy that we know what we are doing.
63        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
64        """
65        key = cast(bytes, id)
66        if not isinstance(key, (bytes, bytearray)):  # defensive guard
67            raise TypeError(
68                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
69            )
70
71        try:
72            return self.store[key]
73        except KeyError as exc:
74            raise KeyError("Object not found in in-memory store") from exc

ContentAddressedStore allows any IPLD scalar key. For the in-memory backend we require a bytes hash; anything else is rejected at run time. In OO type-checking, a subclass may widen (make more general) argument types, but it must never narrow them; otherwise callers that expect the base-class contract can break. Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. This is why we use cast here, to tell mypy that we know what we are doing. h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch

class KuboCAS(py_hamt.ContentAddressedStore):
 77class KuboCAS(ContentAddressedStore):
 78    """
 79    Connects to an **IPFS Kubo** daemon.
 80
 81    The IDs in save and load are IPLD CIDs.
 82
 83    * **save()**  → RPC  (`/api/v0/add`)
 84    * **load()**  → HTTP gateway  (`/ipfs/{cid}`)
 85
 86    `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
 87
 88    ### Authentication / custom headers
 89    You have two options:
 90
 91    1. **Bring your own `httpx.AsyncClient`**
 92       Pass it via `client=...` — any default headers or auth
 93       configured on that client are reused for **every** request.
 94    2. **Let `KuboCAS` build the client** but pass
 95       `headers=` *and*/or `auth=` kwargs; they are forwarded to the
 96       internally–created `AsyncClient`.
 97
 98    ```python
 99    import httpx
100    from py_hamt import KuboCAS
101
102    # Option 1: user-supplied client
103    client = httpx.AsyncClient(
104        headers={"Authorization": "Bearer <token>"},
105        auth=("user", "pass"),
106    )
107    cas = KuboCAS(client=client)
108
109    # Option 2: let KuboCAS create the client
110    cas = KuboCAS(
111        headers={"X-My-Header": "yes"},
112        auth=("user", "pass"),
113    )
114    ```
115
116    ### Parameters
117    - **hasher** (str): multihash name (defaults to *blake3*).
118    - **client** (`httpx.AsyncClient | None`): reuse an existing
119      client; if *None* KuboCAS will create one lazily.
120    - **headers** (dict[str, str] | None): default headers for the
121      internally-created client.
122    - **auth** (`tuple[str, str] | None`): authentication tuple (username, password)
123      for the internally-created client.
124    - **rpc_base_url / gateway_base_url** (str | None): override daemon
125      endpoints (defaults match the local daemon ports).
126    - **chunker** (str): chunking algorithm specification for Kubo's `add`
127      RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or
128      `"rabin-<min>-<avg>-<max>"`.
129
130    ...
131    """
132
133    KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080"
134    KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001"
135
136    DAG_PB_MARKER: int = 0x70
137    """@private"""
138
139    # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon
140    def __init__(
141        self,
142        hasher: str = "blake3",
143        client: httpx.AsyncClient | None = None,
144        rpc_base_url: str | None = None,
145        gateway_base_url: str | None = None,
146        concurrency: int = 32,
147        *,
148        headers: dict[str, str] | None = None,
149        auth: Tuple[str, str] | None = None,
150        chunker: str = "size-1048576",
151        max_retries: int = 3,
152        initial_delay: float = 1.0,
153        backoff_factor: float = 2.0,
154    ):
155        """
156        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
157
158        ### `httpx.AsyncClient` Management
159        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
160        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
161
162        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
163        ```python
164        async with httpx.AsyncClient() as client, KuboCAS(
165            rpc_base_url=rpc_base_url,
166            gateway_base_url=gateway_base_url,
167            client=client,
168        ) as kubo_cas:
169            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
170            zhs = ZarrHAMTStore(hamt)
171            # Use the KuboCAS instance as needed
172            # ...
173        ```
174        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
175        ``` python
176        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
177        # Use the KuboCAS instance as needed
178        # ...
179        await cas.aclose()  # Ensure resources are cleaned up
180        ```
181
182        ### Authenticated RPC/Gateway Access
183        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
184        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
185        If you do not need authentication, you can leave these parameters as `None`.
186
187        ### RPC and HTTP Gateway Base URLs
188        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
189        """
190
191        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
192        if re.fullmatch(chunker_pattern, chunker) is None:
193            raise ValueError("Invalid chunker specification")
194        self.chunker: str = chunker
195
196        self.hasher: str = hasher
197        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
198
199        if rpc_base_url is None:
200            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
201        if gateway_base_url is None:
202            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
203
204        if "/ipfs/" in gateway_base_url:
205            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
206
207        # Standard gateway URL construction with proper path handling
208        if gateway_base_url.endswith("/"):
209            gateway_base_url = f"{gateway_base_url}ipfs/"
210        else:
211            gateway_base_url = f"{gateway_base_url}/ipfs/"
212
213        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false"
214        """@private"""
215        self.gateway_base_url: str = gateway_base_url
216        """@private"""
217
218        if client is not None:
219            # A client was supplied by the user. We don't own it.
220            self._owns_client = False
221            self._client_per_loop = {asyncio.get_running_loop(): client}
222        else:
223            # No client supplied. We will own any clients we create.
224            self._owns_client = True
225            self._client_per_loop = {}
226
227        # store for later use by _loop_client()
228        self._default_headers = headers
229        self._default_auth = auth
230
231        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
232        self._closed: bool = False
233
234        # Validate retry parameters
235        if max_retries < 0:
236            raise ValueError("max_retries must be non-negative")
237        if initial_delay <= 0:
238            raise ValueError("initial_delay must be positive")
239        if backoff_factor < 1.0:
240            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
241
242        self.max_retries = max_retries
243        self.initial_delay = initial_delay
244        self.backoff_factor = backoff_factor
245
246    # --------------------------------------------------------------------- #
247    # helper: get or create the client bound to the current running loop    #
248    # --------------------------------------------------------------------- #
249    def _loop_client(self) -> httpx.AsyncClient:
250        """Get or create a client for the current event loop.
251
252        If the instance was previously closed but owns its clients, a fresh
253        client mapping is lazily created on demand.  Users that supplied their
254        own ``httpx.AsyncClient`` still receive an error when the instance has
255        been closed, as we cannot safely recreate their client.
256        """
257        if self._closed:
258            if not self._owns_client:
259                raise RuntimeError("KuboCAS is closed; create a new instance")
260            # We previously closed all internally-owned clients. Reset the
261            # state so that new clients can be created lazily.
262            self._closed = False
263            self._client_per_loop = {}
264
265        loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
266        try:
267            return self._client_per_loop[loop]
268        except KeyError:
269            # Create a new client
270            client = httpx.AsyncClient(
271                timeout=60.0,
272                headers=self._default_headers,
273                auth=self._default_auth,
274                limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
275                # Uncomment when they finally support Robust HTTP/2 GOAWAY responses
276                # http2=True,
277            )
278            self._client_per_loop[loop] = client
279            return client
280
281    # --------------------------------------------------------------------- #
282    # graceful shutdown: close **all** clients we own                       #
283    # --------------------------------------------------------------------- #
284    async def aclose(self) -> None:
285        """
286        Closes all internally-created clients. Must be called from an async context.
287        """
288        if self._owns_client is False:  # external client → caller closes
289            return
290
291        # This method is async, so we can reliably await the async close method.
292        # The complex sync/async logic is handled by __del__.
293        for client in list(self._client_per_loop.values()):
294            if not client.is_closed:
295                try:
296                    await client.aclose()
297                except Exception:
298                    pass  # best-effort cleanup
299
300        self._client_per_loop.clear()
301        self._closed = True
302
303    # At this point, _client_per_loop should be empty or only contain
304    # clients from loops we haven't seen (which shouldn't happen in practice)
305    async def __aenter__(self) -> "KuboCAS":
306        return self
307
308    async def __aexit__(self, *exc: Any) -> None:
309        await self.aclose()
310
311    def __del__(self) -> None:
312        """Best-effort close for internally-created clients."""
313        if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"):
314            return
315
316        if not self._owns_client or self._closed:
317            return
318
319        # Attempt proper cleanup if possible
320        try:
321            loop = asyncio.get_running_loop()
322        except RuntimeError:
323            # No running loop - can't do async cleanup
324            # Just clear the client references synchronously
325            if hasattr(self, "_client_per_loop"):
326                # We can't await client.aclose() without a loop,
327                # so just clear the references
328                self._client_per_loop.clear()
329                self._closed = True
330            return
331
332        # If we get here, we have a running loop
333        try:
334            if loop.is_running():
335                # Schedule cleanup in the existing loop
336                loop.create_task(self.aclose())
337            else:
338                # Loop exists but not running - try asyncio.run
339                coro = self.aclose()  # Create the coroutine
340                try:
341                    asyncio.run(coro)
342                except Exception:
343                    # If asyncio.run fails, we need to close the coroutine properly
344                    coro.close()  # This prevents the RuntimeWarning
345                    raise  # Re-raise to hit the outer except block
346        except Exception:
347            # If all else fails, just clear references
348            if hasattr(self, "_client_per_loop"):
349                self._client_per_loop.clear()
350                self._closed = True
351
352    # --------------------------------------------------------------------- #
353    # save() – now uses the per-loop client                                 #
354    # --------------------------------------------------------------------- #
355    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
356        async with self._sem:
357            files = {"file": data}
358            client = self._loop_client()
359            retry_count = 0
360
361            while retry_count <= self.max_retries:
362                try:
363                    response = await client.post(
364                        self.rpc_url, files=files, timeout=60.0
365                    )
366                    response.raise_for_status()
367                    cid_str: str = response.json()["Hash"]
368                    cid: CID = CID.decode(cid_str)
369                    if cid.codec.code != self.DAG_PB_MARKER:
370                        cid = cid.set(codec=codec)
371                    return cid
372
373                except (httpx.TimeoutException, httpx.RequestError) as e:
374                    retry_count += 1
375                    if retry_count > self.max_retries:
376                        raise httpx.TimeoutException(
377                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
378                            request=e.request
379                            if isinstance(e, httpx.RequestError)
380                            else None,
381                        )
382
383                    # Calculate backoff delay
384                    delay = self.initial_delay * (
385                        self.backoff_factor ** (retry_count - 1)
386                    )
387                    # Add some jitter to prevent thundering herd
388                    jitter = delay * 0.1 * (random.random() - 0.5)
389                    await asyncio.sleep(delay + jitter)
390
391                except httpx.HTTPStatusError:
392                    # Re-raise non-timeout HTTP errors immediately
393                    raise
394        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover
395
396    async def load(self, id: IPLDKind) -> bytes:
397        cid = cast(CID, id)
398        url: str = f"{self.gateway_base_url + str(cid)}"
399        async with self._sem:
400            client = self._loop_client()
401            retry_count = 0
402
403            while retry_count <= self.max_retries:
404                try:
405                    response = await client.get(url, timeout=60.0)
406                    response.raise_for_status()
407                    return response.content
408
409                except (httpx.TimeoutException, httpx.RequestError) as e:
410                    retry_count += 1
411                    if retry_count > self.max_retries:
412                        raise httpx.TimeoutException(
413                            f"Failed to load data after {self.max_retries} retries: {str(e)}",
414                            request=e.request
415                            if isinstance(e, httpx.RequestError)
416                            else None,
417                        )
418
419                    # Calculate backoff delay
420                    delay = self.initial_delay * (
421                        self.backoff_factor ** (retry_count - 1)
422                    )
423                    # Add some jitter to prevent thundering herd
424                    jitter = delay * 0.1 * (random.random() - 0.5)
425                    await asyncio.sleep(delay + jitter)
426
427                except httpx.HTTPStatusError:
428                    # Re-raise non-timeout HTTP errors immediately
429                    raise
430        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Connects to an IPFS Kubo daemon.

The IDs in save and load are IPLD CIDs.

  • save() → RPC (/api/v0/add)
  • load() → HTTP gateway (/ipfs/{cid})

save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.

Authentication / custom headers

You have two options:

  1. Bring your own httpx.AsyncClient Pass it via client=... — any default headers or auth configured on that client are reused for every request.
  2. Let KuboCAS build the client but pass headers= and/or auth= kwargs; they are forwarded to the internally–created AsyncClient.
import httpx
from py_hamt import KuboCAS

# Option 1: user-supplied client
client = httpx.AsyncClient(
    headers={"Authorization": "Bearer <token>"},
    auth=("user", "pass"),
)
cas = KuboCAS(client=client)

# Option 2: let KuboCAS create the client
cas = KuboCAS(
    headers={"X-My-Header": "yes"},
    auth=("user", "pass"),
)

Parameters

  • hasher (str): multihash name (defaults to blake3).
  • client (httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily.
  • headers (dict[str, str] | None): default headers for the internally-created client.
  • auth (tuple[str, str] | None): authentication tuple (username, password) for the internally-created client.
  • rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
  • chunker (str): chunking algorithm specification for Kubo's add RPC. Accepted formats are "size-<positive int>", "rabin", or "rabin-<min>-<avg>-<max>".

...

KuboCAS( hasher: str = 'blake3', client: httpx.AsyncClient | None = None, rpc_base_url: str | None = None, gateway_base_url: str | None = None, concurrency: int = 32, *, headers: dict[str, str] | None = None, auth: Optional[Tuple[str, str]] = None, chunker: str = 'size-1048576', max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0)
140    def __init__(
141        self,
142        hasher: str = "blake3",
143        client: httpx.AsyncClient | None = None,
144        rpc_base_url: str | None = None,
145        gateway_base_url: str | None = None,
146        concurrency: int = 32,
147        *,
148        headers: dict[str, str] | None = None,
149        auth: Tuple[str, str] | None = None,
150        chunker: str = "size-1048576",
151        max_retries: int = 3,
152        initial_delay: float = 1.0,
153        backoff_factor: float = 2.0,
154    ):
155        """
156        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
157
158        ### `httpx.AsyncClient` Management
159        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
160        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
161
162        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
163        ```python
164        async with httpx.AsyncClient() as client, KuboCAS(
165            rpc_base_url=rpc_base_url,
166            gateway_base_url=gateway_base_url,
167            client=client,
168        ) as kubo_cas:
169            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
170            zhs = ZarrHAMTStore(hamt)
171            # Use the KuboCAS instance as needed
172            # ...
173        ```
174        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
175        ``` python
176        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
177        # Use the KuboCAS instance as needed
178        # ...
179        await cas.aclose()  # Ensure resources are cleaned up
180        ```
181
182        ### Authenticated RPC/Gateway Access
183        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
184        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
185        If you do not need authentication, you can leave these parameters as `None`.
186
187        ### RPC and HTTP Gateway Base URLs
188        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
189        """
190
191        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
192        if re.fullmatch(chunker_pattern, chunker) is None:
193            raise ValueError("Invalid chunker specification")
194        self.chunker: str = chunker
195
196        self.hasher: str = hasher
197        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
198
199        if rpc_base_url is None:
200            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
201        if gateway_base_url is None:
202            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
203
204        if "/ipfs/" in gateway_base_url:
205            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
206
207        # Standard gateway URL construction with proper path handling
208        if gateway_base_url.endswith("/"):
209            gateway_base_url = f"{gateway_base_url}ipfs/"
210        else:
211            gateway_base_url = f"{gateway_base_url}/ipfs/"
212
213        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false"
214        """@private"""
215        self.gateway_base_url: str = gateway_base_url
216        """@private"""
217
218        if client is not None:
219            # A client was supplied by the user. We don't own it.
220            self._owns_client = False
221            self._client_per_loop = {asyncio.get_running_loop(): client}
222        else:
223            # No client supplied. We will own any clients we create.
224            self._owns_client = True
225            self._client_per_loop = {}
226
227        # store for later use by _loop_client()
228        self._default_headers = headers
229        self._default_auth = auth
230
231        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
232        self._closed: bool = False
233
234        # Validate retry parameters
235        if max_retries < 0:
236            raise ValueError("max_retries must be non-negative")
237        if initial_delay <= 0:
238            raise ValueError("initial_delay must be positive")
239        if backoff_factor < 1.0:
240            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
241
242        self.max_retries = max_retries
243        self.initial_delay = initial_delay
244        self.backoff_factor = backoff_factor

If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.

httpx.AsyncClient Management

If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose() as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.

If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:

async with httpx.AsyncClient() as client, KuboCAS(
    rpc_base_url=rpc_base_url,
    gateway_base_url=gateway_base_url,
    client=client,
) as kubo_cas:
    hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
    zhs = ZarrHAMTStore(hamt)
    # Use the KuboCAS instance as needed
    # ...

As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.

cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose()  # Ensure resources are cleaned up

Authenticated RPC/Gateway Access

Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in. Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided. If you do not need authentication, you can leave these parameters as None.

RPC and HTTP Gateway Base URLs

These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.

KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = 'http://127.0.0.1:8080'
KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = 'http://127.0.0.1:5001'
chunker: str
hasher: str

The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.

max_retries
initial_delay
backoff_factor
async def aclose(self) -> None:
284    async def aclose(self) -> None:
285        """
286        Closes all internally-created clients. Must be called from an async context.
287        """
288        if self._owns_client is False:  # external client → caller closes
289            return
290
291        # This method is async, so we can reliably await the async close method.
292        # The complex sync/async logic is handled by __del__.
293        for client in list(self._client_per_loop.values()):
294            if not client.is_closed:
295                try:
296                    await client.aclose()
297                except Exception:
298                    pass  # best-effort cleanup
299
300        self._client_per_loop.clear()
301        self._closed = True

Closes all internally-created clients. Must be called from an async context.

async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> multiformats.cid.CID:
355    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
356        async with self._sem:
357            files = {"file": data}
358            client = self._loop_client()
359            retry_count = 0
360
361            while retry_count <= self.max_retries:
362                try:
363                    response = await client.post(
364                        self.rpc_url, files=files, timeout=60.0
365                    )
366                    response.raise_for_status()
367                    cid_str: str = response.json()["Hash"]
368                    cid: CID = CID.decode(cid_str)
369                    if cid.codec.code != self.DAG_PB_MARKER:
370                        cid = cid.set(codec=codec)
371                    return cid
372
373                except (httpx.TimeoutException, httpx.RequestError) as e:
374                    retry_count += 1
375                    if retry_count > self.max_retries:
376                        raise httpx.TimeoutException(
377                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
378                            request=e.request
379                            if isinstance(e, httpx.RequestError)
380                            else None,
381                        )
382
383                    # Calculate backoff delay
384                    delay = self.initial_delay * (
385                        self.backoff_factor ** (retry_count - 1)
386                    )
387                    # Add some jitter to prevent thundering herd
388                    jitter = delay * 0.1 * (random.random() - 0.5)
389                    await asyncio.sleep(delay + jitter)
390
391                except httpx.HTTPStatusError:
392                    # Re-raise non-timeout HTTP errors immediately
393                    raise
394        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
396    async def load(self, id: IPLDKind) -> bytes:
397        cid = cast(CID, id)
398        url: str = f"{self.gateway_base_url + str(cid)}"
399        async with self._sem:
400            client = self._loop_client()
401            retry_count = 0
402
403            while retry_count <= self.max_retries:
404                try:
405                    response = await client.get(url, timeout=60.0)
406                    response.raise_for_status()
407                    return response.content
408
409                except (httpx.TimeoutException, httpx.RequestError) as e:
410                    retry_count += 1
411                    if retry_count > self.max_retries:
412                        raise httpx.TimeoutException(
413                            f"Failed to load data after {self.max_retries} retries: {str(e)}",
414                            request=e.request
415                            if isinstance(e, httpx.RequestError)
416                            else None,
417                        )
418
419                    # Calculate backoff delay
420                    delay = self.initial_delay * (
421                        self.backoff_factor ** (retry_count - 1)
422                    )
423                    # Add some jitter to prevent thundering herd
424                    jitter = delay * 0.1 * (random.random() - 0.5)
425                    await asyncio.sleep(delay + jitter)
426
427                except httpx.HTTPStatusError:
428                    # Re-raise non-timeout HTTP errors immediately
429                    raise
430        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Retrieve data.

class ZarrHAMTStore(zarr.abc.store.Store):
 12class ZarrHAMTStore(zarr.abc.store.Store):
 13    """
 14    Write and read Zarr v3s with a HAMT.
 15
 16    Read **or** write a Zarr-v3 store whose key/value pairs live inside a
 17    py-hamt mapping.
 18
 19    Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and
 20    the value is the raw byte payload produced by Zarr.  No additional
 21    framing, compression, or encryption is applied by this class. For a zarr encryption example
 22    see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23    For a fully encrypted zarr store, where metadata is not available, please see
 24    :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it.
 25
 26    #### A note about using the same `ZarrHAMTStore` for writing and then reading again
 27    If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
 28
 29    #### Sample Code
 30    ```python
 31    # --- Write ---
 32    ds: xarray.Dataset = # ...
 33    cas: ContentAddressedStore = # ...
 34    hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
 35    hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
 36    zhs  = ZarrHAMTStore(hamt, read_only=False)
 37    ds.to_zarr(store=zhs, mode="w", zarr_format=3)
 38    await hamt.make_read_only() # flush + freeze
 39    root_node_id = hamt.root_node_id
 40    print(root_node_id)
 41
 42     # --- read ---
 43    hamt_ro = await HAMT.build(
 44        cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
 45    )
 46    zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
 47    ds_ro = xarray.open_zarr(store=zhs_ro)
 48
 49
 50    print(ds_ro)
 51    xarray.testing.assert_identical(ds, ds_ro)
 52    ```
 53    """
 54
 55    _forced_read_only: bool | None = None  # sentinel for wrapper clones
 56
 57    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
 58        """
 59        ### `hamt` and `read_only`
 60        You need to make sure the following two things are true:
 61
 62        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
 63        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
 64
 65        ##### A note about the zarr chunk separator, "/" vs "."
 66        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
 67
 68        #### Metadata Read Cache
 69        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
 70        """
 71        super().__init__(read_only=read_only)
 72
 73        assert hamt.read_only == read_only
 74        assert hamt.values_are_bytes
 75        self.hamt: HAMT = hamt
 76        """
 77        The internal HAMT.
 78        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
 79        """
 80
 81        self.metadata_read_cache: dict[str, bytes] = {}
 82        """@private"""
 83
 84    @property
 85    def read_only(self) -> bool:  # type: ignore[override]
 86        if self._forced_read_only is not None:  # instance attr overrides
 87            return self._forced_read_only
 88        return self.hamt.read_only
 89
 90    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
 91        """
 92        Return this store (if the flag already matches) or a *shallow*
 93        clone that presents the requested read‑only status.
 94
 95        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
 96        instance; no flushing, network traffic or async work is done.
 97        """
 98        # Fast path
 99        if read_only == self.read_only:
100            return self  # Same mode, return same instance
101
102        # Create new instance with different read_only flag
103        # Creates a *bare* instance without running its __init__
104        clone = type(self).__new__(type(self))
105
106        # Copy attributes that matter
107        clone.hamt = self.hamt  # Share the HAMT
108        clone._forced_read_only = read_only
109        clone.metadata_read_cache = self.metadata_read_cache.copy()
110
111        # Re‑initialise the zarr base class so that Zarr sees the flag
112        zarr.abc.store.Store.__init__(clone, read_only=read_only)
113        return clone
114
115    def __eq__(self, other: object) -> bool:
116        """@private"""
117        if not isinstance(other, ZarrHAMTStore):
118            return False
119        return self.hamt.root_node_id == other.hamt.root_node_id
120
121    async def get(
122        self,
123        key: str,
124        prototype: zarr.core.buffer.BufferPrototype,
125        byte_range: zarr.abc.store.ByteRequest | None = None,
126    ) -> zarr.core.buffer.Buffer | None:
127        """@private"""
128        try:
129            val: bytes
130            # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will
131            is_metadata: bool = (
132                len(key) >= 9 and key[-9:] == "zarr.json"
133            )  # if path ends with zarr.json
134
135            if is_metadata and key in self.metadata_read_cache:
136                val = self.metadata_read_cache[key]
137            else:
138                val = cast(
139                    bytes, await self.hamt.get(key)
140                )  # We know values received will always be bytes since we only store bytes in the HAMT
141                if is_metadata:
142                    self.metadata_read_cache[key] = val
143
144            return prototype.buffer.from_bytes(val)
145        except KeyError:
146            # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases
147            return None
148
149    async def get_partial_values(
150        self,
151        prototype: zarr.core.buffer.BufferPrototype,
152        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
153    ) -> list[zarr.core.buffer.Buffer | None]:
154        """@private"""
155        raise NotImplementedError
156
157    async def exists(self, key: str) -> bool:
158        """@private"""
159        try:
160            await self.hamt.get(key)
161            return True
162        except KeyError:
163            return False
164
165    @property
166    def supports_writes(self) -> bool:
167        """@private"""
168        return not self.hamt.read_only
169
170    @property
171    def supports_partial_writes(self) -> bool:
172        """@private"""
173        return False
174
175    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
176        """@private"""
177        if self.read_only:
178            raise Exception("Cannot write to a read-only store.")
179
180        if key in self.metadata_read_cache:
181            self.metadata_read_cache[key] = value.to_bytes()
182        await self.hamt.set(key, value.to_bytes())
183
184    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
185        """@private"""
186        if not (await self.exists(key)):
187            await self.set(key, value)
188
189    async def set_partial_values(
190        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
191    ) -> None:
192        """@private"""
193        raise NotImplementedError
194
195    @property
196    def supports_deletes(self) -> bool:
197        """@private"""
198        return not self.hamt.read_only
199
200    async def delete(self, key: str) -> None:
201        """@private"""
202        if self.read_only:
203            raise Exception("Cannot write to a read-only store.")
204        try:
205            await self.hamt.delete(key)
206            # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo
207            # if key in self.metadata_read_cache:
208            #     del self.metadata_read_cache[key]
209        # It's fine if the key was not in the HAMT
210        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues
211        except KeyError:
212            return
213
214    @property
215    def supports_listing(self) -> bool:
216        """@private"""
217        return True
218
219    async def list(self) -> AsyncIterator[str]:
220        """@private"""
221        async for key in self.hamt.keys():
222            yield key
223
224    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
225        """@private"""
226        async for key in self.hamt.keys():
227            if key.startswith(prefix):
228                yield key
229
230    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
231        """
232        @private
233        List *immediate* children that live directly under **prefix**.
234
235        This is similar to :py:meth:`list_prefix` but collapses everything
236        below the first ``"/"`` after *prefix*.  Each child name is yielded
237        **exactly once** in the order of first appearance while scanning the
238        HAMT keys.
239
240        Parameters
241        ----------
242        prefix : str
243            Logical directory path.  *Must* end with ``"/"`` for the result to
244            make sense (e.g. ``"a/b/"``).
245
246        Yields
247        ------
248        str
249            The name of each direct child (file or sub-directory) of *prefix*.
250
251        Examples
252        --------
253        With keys ::
254
255            a/b/c/d
256            a/b/c/e
257            a/b/f
258            a/b/g/h/i
259
260        ``await list_dir("a/b/")`` produces ::
261
262            c
263            f
264            g
265
266        Notes
267        -----
268        • Internally uses a :class:`set` to deduplicate names; memory grows
269            with the number of *unique* children, not the total number of keys.
270        • Order is **not** sorted; it reflects the first encounter while
271            iterating over :py:meth:`HAMT.keys`.
272        """
273        seen_names: set[str] = set()
274        async for key in self.hamt.keys():
275            if key.startswith(prefix):
276                suffix: str = key[len(prefix) :]
277                first_slash: int = suffix.find("/")
278                if first_slash == -1:
279                    if suffix not in seen_names:
280                        seen_names.add(suffix)
281                        yield suffix
282                else:
283                    name: str = suffix[0:first_slash]
284                    if name not in seen_names:
285                        seen_names.add(name)
286                        yield name

Write and read Zarr v3s with a HAMT.

Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.

Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and the value is the raw byte payload produced by Zarr. No additional framing, compression, or encryption is applied by this class. For a zarr encryption example see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb For a fully encrypted zarr store, where metadata is not available, please see SimpleEncryptedZarrHAMTStore but we do not recommend using it.

A note about using the same ZarrHAMTStore for writing and then reading again

If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.

Sample Code

# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
zhs  = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)

 # --- read ---
hamt_ro = await HAMT.build(
    cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)


print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
ZarrHAMTStore(hamt: HAMT, read_only: bool = False)
57    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
58        """
59        ### `hamt` and `read_only`
60        You need to make sure the following two things are true:
61
62        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
63        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
64
65        ##### A note about the zarr chunk separator, "/" vs "."
66        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
67
68        #### Metadata Read Cache
69        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
70        """
71        super().__init__(read_only=read_only)
72
73        assert hamt.read_only == read_only
74        assert hamt.values_are_bytes
75        self.hamt: HAMT = hamt
76        """
77        The internal HAMT.
78        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
79        """
80
81        self.metadata_read_cache: dict[str, bytes] = {}
82        """@private"""

hamt and read_only

You need to make sure the following two things are true:

  1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but __init__ cannot be async.
  2. The HAMT has hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."

While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.

Metadata Read Cache

ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.

hamt: HAMT

The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.

read_only: bool
84    @property
85    def read_only(self) -> bool:  # type: ignore[override]
86        if self._forced_read_only is not None:  # instance attr overrides
87            return self._forced_read_only
88        return self.hamt.read_only

Is the store read-only?

def with_read_only(self, read_only: bool = False) -> ZarrHAMTStore:
 90    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
 91        """
 92        Return this store (if the flag already matches) or a *shallow*
 93        clone that presents the requested read‑only status.
 94
 95        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
 96        instance; no flushing, network traffic or async work is done.
 97        """
 98        # Fast path
 99        if read_only == self.read_only:
100            return self  # Same mode, return same instance
101
102        # Create new instance with different read_only flag
103        # Creates a *bare* instance without running its __init__
104        clone = type(self).__new__(type(self))
105
106        # Copy attributes that matter
107        clone.hamt = self.hamt  # Share the HAMT
108        clone._forced_read_only = read_only
109        clone.metadata_read_cache = self.metadata_read_cache.copy()
110
111        # Re‑initialise the zarr base class so that Zarr sees the flag
112        zarr.abc.store.Store.__init__(clone, read_only=read_only)
113        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.

class SimpleEncryptedZarrHAMTStore(py_hamt.ZarrHAMTStore):
 13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore):
 14    """
 15    Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy.
 16
 17    This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
 18    stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
 19    and data chunks. This provides strong privacy but means the Zarr store is
 20    completely opaque and unusable without the correct encryption key and header.
 21
 22    Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23
 24    #### Encryption Details
 25    - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
 26    - Requires a 32-byte encryption key and a header.
 27    - Each encrypted value includes a 24-byte nonce and a 16-byte tag.
 28
 29    #### Important Considerations
 30    - Since metadata is encrypted, standard Zarr tools cannot inspect the
 31      dataset without prior decryption using this store class.
 32    - There is no support for partial encryption or excluding variables.
 33    - There is no metadata caching.
 34
 35    #### Sample Code
 36    ```python
 37    import xarray
 38    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
 39    from Crypto.Random import get_random_bytes
 40    import numpy as np
 41
 42    # Setup
 43    ds = xarray.Dataset(
 44        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
 45        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
 46    )
 47    cas = KuboCAS() # Example ContentAddressedStore
 48    encryption_key = get_random_bytes(32)
 49    header = b"fully-encrypted-zarr"
 50
 51    # --- Write ---
 52    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
 53    ezhs_write = SimpleEncryptedZarrHAMTStore(
 54        hamt_write, False, encryption_key, header
 55    )
 56    print("Writing fully encrypted Zarr...")
 57    ds.to_zarr(store=ezhs_write, mode="w")
 58    await hamt_write.make_read_only()
 59    root_node_id = hamt_write.root_node_id
 60    print(f"Wrote Zarr with root: {root_node_id}")
 61
 62    # --- Read ---
 63    hamt_read = await HAMT.build(
 64            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
 65        )
 66    ezhs_read = SimpleEncryptedZarrHAMTStore(
 67        hamt_read, True, encryption_key, header
 68    )
 69    print("\nReading fully encrypted Zarr...")
 70    ds_read = xarray.open_zarr(store=ezhs_read)
 71    print("Read back dataset:")
 72    print(ds_read)
 73    xarray.testing.assert_identical(ds, ds_read)
 74    print("Read successful and data verified.")
 75
 76    # --- Read with wrong key (demonstrates failure) ---
 77    wrong_key = get_random_bytes(32)
 78    hamt_bad = await HAMT.build(
 79        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
 80    )
 81    ezhs_bad = SimpleEncryptedZarrHAMTStore(
 82        hamt_bad, True, wrong_key, header
 83    )
 84    print("\nAttempting to read with wrong key...")
 85    try:
 86        ds_bad = xarray.open_zarr(store=ezhs_bad)
 87        print(ds_bad)
 88    except Exception as e:
 89        print(f"Failed to read as expected: {type(e).__name__} - {e}")
 90    ```
 91    """
 92
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}
113
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone
126
127    def _encrypt(self, val: bytes) -> bytes:
128        """Encrypts data using ChaCha20-Poly1305."""
129        nonce = get_random_bytes(24)  # XChaCha20 uses a 24-byte nonce
130        cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
131        cipher.update(self.header)
132        ciphertext, tag = cipher.encrypt_and_digest(val)
133        return nonce + tag + ciphertext
134
135    def _decrypt(self, val: bytes) -> bytes:
136        """Decrypts data using ChaCha20-Poly1305."""
137        try:
138            # Extract nonce (24), tag (16), and ciphertext
139            nonce, tag, ciphertext = val[:24], val[24:40], val[40:]
140            cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
141            cipher.update(self.header)
142            plaintext = cipher.decrypt_and_verify(ciphertext, tag)
143            return plaintext
144        except Exception as e:
145            # Catching a broad exception as various issues (key, tag, length) can occur.
146            raise ValueError(
147                "Decryption failed. Check key, header, or data integrity."
148            ) from e
149
150    def __eq__(self, other: object) -> bool:
151        """@private"""
152        if not isinstance(other, SimpleEncryptedZarrHAMTStore):
153            return False
154        return (
155            self.hamt.root_node_id == other.hamt.root_node_id
156            and self.encryption_key == other.encryption_key
157            and self.header == other.header
158        )
159
160    async def get(
161        self,
162        key: str,
163        prototype: zarr.core.buffer.BufferPrototype,
164        byte_range: zarr.abc.store.ByteRequest | None = None,
165    ) -> zarr.core.buffer.Buffer | None:
166        """@private"""
167        try:
168            decrypted_val: bytes
169            is_metadata: bool = (
170                len(key) >= 9 and key[-9:] == "zarr.json"
171            )  # if path ends with zarr.json
172
173            if is_metadata and key in self.metadata_read_cache:
174                decrypted_val = self.metadata_read_cache[key]
175            else:
176                raw_val = cast(
177                    bytes, await self.hamt.get(key)
178                )  # We know values received will always be bytes since we only store bytes in the HAMT
179                decrypted_val = self._decrypt(raw_val)
180                if is_metadata:
181                    self.metadata_read_cache[key] = decrypted_val
182            return prototype.buffer.from_bytes(decrypted_val)
183        except KeyError:
184            return None
185
186    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
187        """@private"""
188        if self.read_only:
189            raise Exception("Cannot write to a read-only store.")
190
191        raw_bytes = value.to_bytes()
192        if key in self.metadata_read_cache:
193            self.metadata_read_cache[key] = raw_bytes
194        # Encrypt it
195        encrypted_bytes = self._encrypt(raw_bytes)
196        await self.hamt.set(key, encrypted_bytes)

Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.

This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.

Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb

#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.

#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
  dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.

#### Sample Code


    import xarray
    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
    from Crypto.Random import get_random_bytes
    import numpy as np

    # Setup
    ds = xarray.Dataset(
        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
    )
    cas = KuboCAS() # Example ContentAddressedStore
    encryption_key = get_random_bytes(32)
    header = b"fully-encrypted-zarr"

    # --- Write ---
    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
    ezhs_write = SimpleEncryptedZarrHAMTStore(
        hamt_write, False, encryption_key, header
    )
    print("Writing fully encrypted Zarr...")
    ds.to_zarr(store=ezhs_write, mode="w")
    await hamt_write.make_read_only()
    root_node_id = hamt_write.root_node_id
    print(f"Wrote Zarr with root: {root_node_id}")

    # --- Read ---
    hamt_read = await HAMT.build(
            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
        )
    ezhs_read = SimpleEncryptedZarrHAMTStore(
        hamt_read, True, encryption_key, header
    )
    print("
Reading fully encrypted Zarr...")
    ds_read = xarray.open_zarr(store=ezhs_read)
    print("Read back dataset:")
    print(ds_read)
    xarray.testing.assert_identical(ds, ds_read)
    print("Read successful and data verified.")

    # --- Read with wrong key (demonstrates failure) ---
    wrong_key = get_random_bytes(32)
    hamt_bad = await HAMT.build(
        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
    )
    ezhs_bad = SimpleEncryptedZarrHAMTStore(
        hamt_bad, True, wrong_key, header
    )
    print("
Attempting to read with wrong key...")
    try:
        ds_bad = xarray.open_zarr(store=ezhs_bad)
        print(ds_bad)
    except Exception as e:
        print(f"Failed to read as expected: {type(e).__name__} - {e}")
SimpleEncryptedZarrHAMTStore( hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes)
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}

Initializes the SimpleEncryptedZarrHAMTStore.

Args: hamt: The HAMT instance for storage. Must have values_are_bytes=True. Its read_only status must match the read_only argument. read_only: If True, the store is in read-only mode. encryption_key: A 32-byte key for ChaCha20-Poly1305. header: A header (bytes) used as associated data in encryption.

encryption_key
header
def with_read_only( self, read_only: bool = False) -> SimpleEncryptedZarrHAMTStore:
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.