py_hamt

 1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
 2from .hamt import HAMT, blake3_hashfn
 3from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
 4from .zarr_hamt_store import ZarrHAMTStore
 5
 6__all__ = [
 7    "blake3_hashfn",
 8    "HAMT",
 9    "ContentAddressedStore",
10    "InMemoryCAS",
11    "KuboCAS",
12    "ZarrHAMTStore",
13    "SimpleEncryptedZarrHAMTStore",
14]
15
16print("Running py-hamt from source!")
def blake3_hashfn(input_bytes: bytes) -> bytes:
52def blake3_hashfn(input_bytes: bytes) -> bytes:
53    """
54    This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size.
55
56    """
57    # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify
58    digest: bytes = b3.digest(input_bytes, size=32)
59    raw_bytes: bytes = b3.unwrap(digest)
60    return raw_bytes

This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.

class HAMT:
288class HAMT:
289    """
290    An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
291
292    Use this to store arbitrarily large key-value mappings in your CAS of choice.
293
294    For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
295
296    When in read-only mode, the HAMT is both async and thread safe.
297
298    #### A note about memory management, read+write and read-only modes
299    The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
300
301    Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`.
302
303    These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits.
304
305    #### IPFS HAMT Sample Code
306    ```python
307    kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
308    hamt = await HAMT.build(cas=kubo_cas)
309    await hamt.set("foo", "bar")
310    assert (await hamt.get("foo")) == "bar"
311    await hamt.make_read_only()
312    cid = hamt.root_node_id # our root node CID
313    print(cid)
314    ```
315    """
316
317    def __init__(
318        self,
319        cas: ContentAddressedStore,
320        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
321        root_node_id: IPLDKind | None = None,
322        read_only: bool = False,
323        max_bucket_size: int = 4,
324        values_are_bytes: bool = False,
325    ):
326        """
327        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
328        """
329
330        self.cas: ContentAddressedStore = cas
331        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
332
333        self.hash_fn: Callable[[bytes], bytes] = hash_fn
334        """
335        This is the hash function used to place a key-value within the HAMT.
336
337        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
338
339        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
340
341        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
342        """
343
344        self.lock: asyncio.Lock = asyncio.Lock()
345        """@private"""
346
347        self.values_are_bytes: bool = values_are_bytes
348        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
349
350        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
351        """
352
353        if max_bucket_size < 1:
354            raise ValueError("Bucket size maximum must be a positive integer")
355        self.max_bucket_size: int = max_bucket_size
356        """
357        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
358
359        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
360
361        This must be a positive integer with a minimum of 1.
362        """
363
364        self.root_node_id: IPLDKind = root_node_id
365        """
366        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
367
368        Read from this only when in read mode to get something valid!
369        """
370
371        self.read_only: bool = read_only
372        """Clients should NOT modify this.
373
374        This is here for checking whether the HAMT is in read only or read/write mode.
375
376        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
377        """
378        self.node_store: NodeStore
379        """@private"""
380        if read_only:
381            self.node_store = ReadCacheStore(self)
382        else:
383            self.node_store = InMemoryTreeStore(self)
384
385    @classmethod
386    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
387        """
388        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
389
390        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
391        """
392        hamt = cls(*args, **kwargs)
393        if hamt.root_node_id is None:
394            hamt.root_node_id = await hamt.node_store.save(None, Node())
395        return hamt
396
397    # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async
398    async def make_read_only(self) -> None:
399        """
400        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
401
402        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
403        """
404        async with self.lock:
405            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
406            await inmemory_tree.vacate()
407
408            self.read_only = True
409            self.node_store = ReadCacheStore(self)
410
411    async def enable_write(self) -> None:
412        """
413        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
414        """
415        async with self.lock:
416            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
417            self.read_only = False
418            self.node_store = InMemoryTreeStore(self)
419
420    async def cache_size(self) -> int:
421        """
422        Returns the memory used by some internal performance optimization tools in bytes.
423
424        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
425
426        Be warned that this may take a while to run for large HAMTs.
427
428        For more on memory management, see the `HAMT` class documentation.
429        """
430        if self.read_only:
431            return self.node_store.size()
432        async with self.lock:
433            return self.node_store.size()
434
435    async def cache_vacate(self) -> None:
436        """
437        Vacate and completely empty out the internal read/write cache.
438
439        Be warned that this may take a while if there have been a lot of write operations.
440
441        For more on memory management, see the `HAMT` class documentation.
442        """
443        if self.read_only:
444            await self.node_store.vacate()
445        else:
446            async with self.lock:
447                await self.node_store.vacate()
448
449    async def _reserialize_and_link(
450        self, node_stack: list[tuple[IPLDKind, Node]]
451    ) -> None:
452        """
453        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
454        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
455        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
456        If a node ends up being empty, then it is deleted entirely, unless it is the root node
457        Modifies in place
458        """
459        # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root
460        for stack_index in range(len(node_stack) - 1, -1, -1):
461            old_id, node = node_stack[stack_index]
462
463            # If this node is empty, and it's not the root node, then we can delete it entirely from the list
464            is_root: bool = stack_index == 0
465            if node.is_empty() and not is_root:
466                # Unlink from the rest of the tree
467                _, prev_node = node_stack[stack_index - 1]
468                # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation
469                for link_index in prev_node.iter_link_indices():
470                    link = prev_node.get_link(link_index)
471                    if link == old_id:
472                        # Delete the link by making it an empty bucket
473                        prev_node.data[link_index] = {}
474                        break
475
476                # Remove from our stack, continue reserializing up the tree
477                node_stack.pop(stack_index)
478                continue
479
480            # If not an empty node, just reserialize like normal and replace this one
481            new_store_id: IPLDKind = await self.node_store.save(old_id, node)
482            node_stack[stack_index] = (new_store_id, node)
483
484            # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized
485            if not is_root:
486                _, prev_node = node_stack[stack_index - 1]
487                prev_node.replace_link(old_id, new_store_id)
488
489    # automatically skip encoding if the value provided is of the bytes variety
490    async def set(self, key: str, val: IPLDKind) -> None:
491        """Write a key-value mapping."""
492        if self.read_only:
493            raise Exception("Cannot call set on a read only HAMT")
494
495        data: bytes
496        if self.values_are_bytes:
497            data = cast(
498                bytes, val
499            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
500        else:
501            data = dag_cbor.encode(val)
502
503        pointer: IPLDKind = await self.cas.save(data, codec="raw")
504        await self._set_pointer(key, pointer)
505
506    async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None:
507        async with self.lock:
508            node_stack: list[tuple[IPLDKind, Node]] = []
509            root_node: Node = await self.node_store.load(self.root_node_id)
510            node_stack.append((self.root_node_id, root_node))
511
512            # FIFO queue to keep track of all the KVs we need to insert
513            # This is needed if any buckets overflow and so we need to reinsert all those KVs
514            kvs_queue: list[tuple[str, IPLDKind]] = []
515            kvs_queue.append((key, val_ptr))
516
517            while len(kvs_queue) > 0:
518                _, top_node = node_stack[-1]
519                curr_key, curr_val_ptr = kvs_queue[0]
520
521                raw_hash: bytes = self.hash_fn(curr_key.encode())
522                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
523
524                item = top_node.data[map_key]
525                if isinstance(item, list):
526                    next_node_id: IPLDKind = item[0]
527                    next_node: Node = await self.node_store.load(next_node_id)
528                    node_stack.append((next_node_id, next_node))
529                elif isinstance(item, dict):
530                    bucket: dict[str, IPLDKind] = item
531
532                    # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue
533                    if curr_key in bucket or len(bucket) < self.max_bucket_size:
534                        bucket[curr_key] = curr_val_ptr
535                        kvs_queue.pop(0)
536                        continue
537
538                    # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion
539                    for k in bucket:
540                        v_ptr = bucket[k]
541                        kvs_queue.append((k, v_ptr))
542
543                    # Create a new link to a new node so that we can reflow these KVs into a new subtree
544                    new_node = Node()
545                    new_node_id: IPLDKind = await self.node_store.save(None, new_node)
546                    link: list[IPLDKind] = [new_node_id]
547                    top_node.data[map_key] = link
548
549            # Finally, reserialize and fix all links, deleting empty nodes as needed
550            await self._reserialize_and_link(node_stack)
551            self.root_node_id = node_stack[0][0]
552
553    async def delete(self, key: str) -> None:
554        """Delete a key-value mapping."""
555
556        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
557        if self.read_only:
558            raise Exception("Cannot call delete on a read only HAMT")
559
560        async with self.lock:
561            raw_hash: bytes = self.hash_fn(key.encode())
562
563            node_stack: list[tuple[IPLDKind, Node]] = []
564            root_node: Node = await self.node_store.load(self.root_node_id)
565            node_stack.append((self.root_node_id, root_node))
566
567            created_change: bool = False
568            while True:
569                _, top_node = node_stack[-1]
570                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
571
572                item = top_node.data[map_key]
573                if isinstance(item, dict):
574                    bucket = item
575                    if key in bucket:
576                        del bucket[key]
577                        created_change = True
578                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
579                    break
580                elif isinstance(item, list):
581                    link: IPLDKind = item[0]
582                    next_node: Node = await self.node_store.load(link)
583                    node_stack.append((link, next_node))
584
585            # Finally, reserialize and fix all links, deleting empty nodes as needed
586            if created_change:
587                await self._reserialize_and_link(node_stack)
588                self.root_node_id = node_stack[0][0]
589            else:
590                # If we didn't make a change, then this key must not exist within the HAMT
591                raise KeyError
592
593    async def get(self, key: str) -> IPLDKind:
594        """Get a value."""
595        pointer: IPLDKind = await self.get_pointer(key)
596        data: bytes = await self.cas.load(pointer)
597        if self.values_are_bytes:
598            return data
599        else:
600            return dag_cbor.decode(data)
601
602    async def get_pointer(self, key: str) -> IPLDKind:
603        """
604        Get a store ID that points to the value for this key.
605
606        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
607        """
608        # If read only, no need to acquire a lock
609        pointer: IPLDKind
610        if self.read_only:
611            pointer = await self._get_pointer(key)
612        else:
613            async with self.lock:
614                pointer = await self._get_pointer(key)
615
616        return pointer
617
618    # Callers MUST handle acquiring a lock
619    async def _get_pointer(self, key: str) -> IPLDKind:
620        raw_hash: bytes = self.hash_fn(key.encode())
621
622        current_id: IPLDKind = self.root_node_id
623        current_depth: int = 0
624
625        # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind
626        result_ptr: IPLDKind = None
627        found_a_result: bool = False
628        while True:
629            top_id: IPLDKind = current_id
630            top_node: Node = await self.node_store.load(top_id)
631            map_key: int = extract_bits(raw_hash, current_depth, 8)
632
633            # Check if this key is in one of the buckets
634            item = top_node.data[map_key]
635            if isinstance(item, dict):
636                bucket = item
637                if key in bucket:
638                    result_ptr = bucket[key]
639                    found_a_result = True
640                    break
641
642            if isinstance(item, list):
643                link: IPLDKind = item[0]
644                current_id = link
645                current_depth += 1
646                continue
647
648            # Nowhere left to go, stop walking down the tree
649            break
650
651        if not found_a_result:
652            raise KeyError
653
654        return result_ptr
655
656    # Callers MUST handle locking or not on their own
657    async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]:
658        node_id_stack: list[IPLDKind] = [self.root_node_id]
659        while len(node_id_stack) > 0:
660            top_id: IPLDKind = node_id_stack.pop()
661            node: Node = await self.node_store.load(top_id)
662            yield (top_id, node)
663            node_id_stack.extend(list(node.iter_links()))
664
665    async def keys(self) -> AsyncIterator[str]:
666        """
667        AsyncIterator returning all keys in the HAMT.
668
669        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
670
671        When the HAMT is in read only mode however, this can be run concurrently with get operations.
672        """
673        if self.read_only:
674            async for k in self._keys_no_locking():
675                yield k
676        else:
677            async with self.lock:
678                async for k in self._keys_no_locking():
679                    yield k
680
681    async def _keys_no_locking(self) -> AsyncIterator[str]:
682        async for _, node in self._iter_nodes():
683            for bucket in node.iter_buckets():
684                for key in bucket:
685                    yield key
686
687    async def len(self) -> int:
688        """
689        Return the number of key value mappings in this HAMT.
690
691        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
692        """
693        count: int = 0
694        async for _ in self.keys():
695            count += 1
696
697        return count

An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.

Use this to store arbitrarily large key-value mappings in your CAS of choice.

For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.

When in read-only mode, the HAMT is both async and thread safe.

A note about memory management, read+write and read-only modes

The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.

Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.

These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.

IPFS HAMT Sample Code

kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
HAMT( cas: ContentAddressedStore, hash_fn: Callable[[bytes], bytes] = <function blake3_hashfn>, root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]] = None, read_only: bool = False, max_bucket_size: int = 4, values_are_bytes: bool = False)
317    def __init__(
318        self,
319        cas: ContentAddressedStore,
320        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
321        root_node_id: IPLDKind | None = None,
322        read_only: bool = False,
323        max_bucket_size: int = 4,
324        values_are_bytes: bool = False,
325    ):
326        """
327        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
328        """
329
330        self.cas: ContentAddressedStore = cas
331        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
332
333        self.hash_fn: Callable[[bytes], bytes] = hash_fn
334        """
335        This is the hash function used to place a key-value within the HAMT.
336
337        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
338
339        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
340
341        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
342        """
343
344        self.lock: asyncio.Lock = asyncio.Lock()
345        """@private"""
346
347        self.values_are_bytes: bool = values_are_bytes
348        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
349
350        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
351        """
352
353        if max_bucket_size < 1:
354            raise ValueError("Bucket size maximum must be a positive integer")
355        self.max_bucket_size: int = max_bucket_size
356        """
357        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
358
359        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
360
361        This must be a positive integer with a minimum of 1.
362        """
363
364        self.root_node_id: IPLDKind = root_node_id
365        """
366        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
367
368        Read from this only when in read mode to get something valid!
369        """
370
371        self.read_only: bool = read_only
372        """Clients should NOT modify this.
373
374        This is here for checking whether the HAMT is in read only or read/write mode.
375
376        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
377        """
378        self.node_store: NodeStore
379        """@private"""
380        if read_only:
381            self.node_store = ReadCacheStore(self)
382        else:
383            self.node_store = InMemoryTreeStore(self)

Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.

The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.

hash_fn: Callable[[bytes], bytes]

This is the hash function used to place a key-value within the HAMT.

To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.

It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.

Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.

values_are_bytes: bool

Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.

This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.

max_bucket_size: int

This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.

This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.

This must be a positive integer with a minimum of 1.

root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]

This is type IPLDKind but the documentation generator pdoc mangles it a bit.

Read from this only when in read mode to get something valid!

read_only: bool

Clients should NOT modify this.

This is here for checking whether the HAMT is in read only or read/write mode.

The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.

@classmethod
async def build(cls, *args: Any, **kwargs: Any) -> HAMT:
385    @classmethod
386    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
387        """
388        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
389
390        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
391        """
392        hamt = cls(*args, **kwargs)
393        if hamt.root_node_id is None:
394            hamt.root_node_id = await hamt.node_store.save(None, Node())
395        return hamt

Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.

This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.

async def make_read_only(self) -> None:
398    async def make_read_only(self) -> None:
399        """
400        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
401
402        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
403        """
404        async with self.lock:
405            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
406            await inmemory_tree.vacate()
407
408            self.read_only = True
409            self.node_store = ReadCacheStore(self)

Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.

In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.

async def enable_write(self) -> None:
411    async def enable_write(self) -> None:
412        """
413        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
414        """
415        async with self.lock:
416            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
417            self.read_only = False
418            self.node_store = InMemoryTreeStore(self)

Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.

async def cache_size(self) -> int:
420    async def cache_size(self) -> int:
421        """
422        Returns the memory used by some internal performance optimization tools in bytes.
423
424        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
425
426        Be warned that this may take a while to run for large HAMTs.
427
428        For more on memory management, see the `HAMT` class documentation.
429        """
430        if self.read_only:
431            return self.node_store.size()
432        async with self.lock:
433            return self.node_store.size()

Returns the memory used by some internal performance optimization tools in bytes.

This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.

Be warned that this may take a while to run for large HAMTs.

For more on memory management, see the HAMT class documentation.

async def cache_vacate(self) -> None:
435    async def cache_vacate(self) -> None:
436        """
437        Vacate and completely empty out the internal read/write cache.
438
439        Be warned that this may take a while if there have been a lot of write operations.
440
441        For more on memory management, see the `HAMT` class documentation.
442        """
443        if self.read_only:
444            await self.node_store.vacate()
445        else:
446            async with self.lock:
447                await self.node_store.vacate()

Vacate and completely empty out the internal read/write cache.

Be warned that this may take a while if there have been a lot of write operations.

For more on memory management, see the HAMT class documentation.

async def set( self, key: str, val: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> None:
490    async def set(self, key: str, val: IPLDKind) -> None:
491        """Write a key-value mapping."""
492        if self.read_only:
493            raise Exception("Cannot call set on a read only HAMT")
494
495        data: bytes
496        if self.values_are_bytes:
497            data = cast(
498                bytes, val
499            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
500        else:
501            data = dag_cbor.encode(val)
502
503        pointer: IPLDKind = await self.cas.save(data, codec="raw")
504        await self._set_pointer(key, pointer)

Write a key-value mapping.

async def delete(self, key: str) -> None:
553    async def delete(self, key: str) -> None:
554        """Delete a key-value mapping."""
555
556        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
557        if self.read_only:
558            raise Exception("Cannot call delete on a read only HAMT")
559
560        async with self.lock:
561            raw_hash: bytes = self.hash_fn(key.encode())
562
563            node_stack: list[tuple[IPLDKind, Node]] = []
564            root_node: Node = await self.node_store.load(self.root_node_id)
565            node_stack.append((self.root_node_id, root_node))
566
567            created_change: bool = False
568            while True:
569                _, top_node = node_stack[-1]
570                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
571
572                item = top_node.data[map_key]
573                if isinstance(item, dict):
574                    bucket = item
575                    if key in bucket:
576                        del bucket[key]
577                        created_change = True
578                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
579                    break
580                elif isinstance(item, list):
581                    link: IPLDKind = item[0]
582                    next_node: Node = await self.node_store.load(link)
583                    node_stack.append((link, next_node))
584
585            # Finally, reserialize and fix all links, deleting empty nodes as needed
586            if created_change:
587                await self._reserialize_and_link(node_stack)
588                self.root_node_id = node_stack[0][0]
589            else:
590                # If we didn't make a change, then this key must not exist within the HAMT
591                raise KeyError

Delete a key-value mapping.

async def get( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
593    async def get(self, key: str) -> IPLDKind:
594        """Get a value."""
595        pointer: IPLDKind = await self.get_pointer(key)
596        data: bytes = await self.cas.load(pointer)
597        if self.values_are_bytes:
598            return data
599        else:
600            return dag_cbor.decode(data)

Get a value.

async def get_pointer( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
602    async def get_pointer(self, key: str) -> IPLDKind:
603        """
604        Get a store ID that points to the value for this key.
605
606        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
607        """
608        # If read only, no need to acquire a lock
609        pointer: IPLDKind
610        if self.read_only:
611            pointer = await self._get_pointer(key)
612        else:
613            async with self.lock:
614                pointer = await self._get_pointer(key)
615
616        return pointer

Get a store ID that points to the value for this key.

This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.

async def keys(self) -> AsyncIterator[str]:
665    async def keys(self) -> AsyncIterator[str]:
666        """
667        AsyncIterator returning all keys in the HAMT.
668
669        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
670
671        When the HAMT is in read only mode however, this can be run concurrently with get operations.
672        """
673        if self.read_only:
674            async for k in self._keys_no_locking():
675                yield k
676        else:
677            async with self.lock:
678                async for k in self._keys_no_locking():
679                    yield k

AsyncIterator returning all keys in the HAMT.

If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.

When the HAMT is in read only mode however, this can be run concurrently with get operations.

async def len(self) -> int:
687    async def len(self) -> int:
688        """
689        Return the number of key value mappings in this HAMT.
690
691        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
692        """
693        count: int = 0
694        async for _ in self.keys():
695            count += 1
696
697        return count

Return the number of key value mappings in this HAMT.

When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.

class ContentAddressedStore(abc.ABC):
13class ContentAddressedStore(ABC):
14    """
15    Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data.
16
17    Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately.
18
19    #### A note on the IPLDKind return types
20    Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
21    1. No lists or dicts, since python does not classify these as immutable.
22    2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized.
23    """
24
25    CodecInput = Literal["raw", "dag-cbor"]
26
27    @abstractmethod
28    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
29        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
30
31        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
32        """
33
34    @abstractmethod
35    async def load(self, id: IPLDKind) -> bytes:
36        """Retrieve data."""

Abstract class that represents a content addressed storage that the HAMT can use for keeping data.

Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.

A note on the IPLDKind return types

Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:

  1. No lists or dicts, since python does not classify these as immutable.
  2. No None values since this is used in HAMT's __init__ to indicate that an empty HAMT needs to be initialized.
CodecInput = typing.Literal['raw', 'dag-cbor']
@abstractmethod
async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
27    @abstractmethod
28    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
29        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
30
31        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
32        """

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

@abstractmethod
async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
34    @abstractmethod
35    async def load(self, id: IPLDKind) -> bytes:
36        """Retrieve data."""

Retrieve data.

class InMemoryCAS(py_hamt.ContentAddressedStore):
39class InMemoryCAS(ContentAddressedStore):
40    """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in."""
41
42    store: dict[bytes, bytes]
43    hash_alg: Multihash
44
45    def __init__(self):
46        self.store = dict()
47        self.hash_alg = multihash.get("blake3")
48
49    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
50        hash: bytes = self.hash_alg.digest(data, size=32)
51        self.store[hash] = data
52        return hash
53
54    async def load(self, id: IPLDKind) -> bytes:
55        """
56        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
57        backend we *require* a `bytes` hash; anything else is rejected at run
58        time. In OO type-checking, a subclass may widen (make more general) argument types,
59        but it must never narrow them; otherwise callers that expect the base-class contract can break.
60        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
61        This is why we use `cast` here, to tell mypy that we know what we are doing.
62        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
63        """
64        key = cast(bytes, id)
65        if not isinstance(key, (bytes, bytearray)):  # defensive guard
66            raise TypeError(
67                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
68            )
69
70        try:
71            return self.store[key]
72        except KeyError as exc:
73            raise KeyError("Object not found in in-memory store") from exc

Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.

store: dict[bytes, bytes]
hash_alg: multiformats.multihash.Multihash
async def save(self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> bytes:
49    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
50        hash: bytes = self.hash_alg.digest(data, size=32)
51        self.store[hash] = data
52        return hash

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
54    async def load(self, id: IPLDKind) -> bytes:
55        """
56        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
57        backend we *require* a `bytes` hash; anything else is rejected at run
58        time. In OO type-checking, a subclass may widen (make more general) argument types,
59        but it must never narrow them; otherwise callers that expect the base-class contract can break.
60        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
61        This is why we use `cast` here, to tell mypy that we know what we are doing.
62        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
63        """
64        key = cast(bytes, id)
65        if not isinstance(key, (bytes, bytearray)):  # defensive guard
66            raise TypeError(
67                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
68            )
69
70        try:
71            return self.store[key]
72        except KeyError as exc:
73            raise KeyError("Object not found in in-memory store") from exc

ContentAddressedStore allows any IPLD scalar key. For the in-memory backend we require a bytes hash; anything else is rejected at run time. In OO type-checking, a subclass may widen (make more general) argument types, but it must never narrow them; otherwise callers that expect the base-class contract can break. Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. This is why we use cast here, to tell mypy that we know what we are doing. h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch

class KuboCAS(py_hamt.ContentAddressedStore):
 76class KuboCAS(ContentAddressedStore):
 77    """
 78    Connects to an **IPFS Kubo** daemon.
 79
 80    The IDs in save and load are IPLD CIDs.
 81
 82    * **save()**  → RPC  (`/api/v0/add`)
 83    * **load()**  → HTTP gateway  (`/ipfs/{cid}`)
 84
 85    `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
 86
 87    ### Authentication / custom headers
 88    You have two options:
 89
 90    1. **Bring your own `httpx.AsyncClient`**
 91       Pass it via `client=...` — any default headers or auth
 92       configured on that client are reused for **every** request.
 93    2. **Let `KuboCAS` build the client** but pass
 94       `headers=` *and*/or `auth=` kwargs; they are forwarded to the
 95       internally–created `AsyncClient`.
 96
 97    ```python
 98    import httpx
 99    from py_hamt import KuboCAS
100
101    # Option 1: user-supplied client
102    client = httpx.AsyncClient(
103        headers={"Authorization": "Bearer <token>"},
104        auth=("user", "pass"),
105    )
106    cas = KuboCAS(client=client)
107
108    # Option 2: let KuboCAS create the client
109    cas = KuboCAS(
110        headers={"X-My-Header": "yes"},
111        auth=("user", "pass"),
112    )
113    ```
114
115    ### Parameters
116    - **hasher** (str): multihash name (defaults to *blake3*).
117    - **client** (`httpx.AsyncClient | None`): reuse an existing
118      client; if *None* KuboCAS will create one lazily.
119    - **headers** (dict[str, str] | None): default headers for the
120      internally-created client.
121    - **auth** (`tuple[str, str] | None`): authentication tuple (username, password)
122      for the internally-created client.
123    - **rpc_base_url / gateway_base_url** (str | None): override daemon
124      endpoints (defaults match the local daemon ports).
125    - **chunker** (str): chunking algorithm specification for Kubo's `add`
126      RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or
127      `"rabin-<min>-<avg>-<max>"`.
128
129    ...
130    """
131
132    KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080"
133    KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001"
134
135    DAG_PB_MARKER: int = 0x70
136    """@private"""
137
138    # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon
139    def __init__(
140        self,
141        hasher: str = "blake3",
142        client: httpx.AsyncClient | None = None,
143        rpc_base_url: str | None = None,
144        gateway_base_url: str | None = None,
145        concurrency: int = 32,
146        *,
147        headers: dict[str, str] | None = None,
148        auth: Tuple[str, str] | None = None,
149        chunker: str = "size-1048576",
150    ):
151        """
152        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
153
154        ### `httpx.AsyncClient` Management
155        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
156        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
157
158        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
159        ```python
160        async with httpx.AsyncClient() as client, KuboCAS(
161            rpc_base_url=rpc_base_url,
162            gateway_base_url=gateway_base_url,
163            client=client,
164        ) as kubo_cas:
165            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
166            zhs = ZarrHAMTStore(hamt)
167            # Use the KuboCAS instance as needed
168            # ...
169        ```
170        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
171        ``` python
172        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
173        # Use the KuboCAS instance as needed
174        # ...
175        await cas.aclose()  # Ensure resources are cleaned up
176        ```
177
178        ### Authenticated RPC/Gateway Access
179        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
180        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
181        If you do not need authentication, you can leave these parameters as `None`.
182
183        ### RPC and HTTP Gateway Base URLs
184        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
185        """
186
187        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
188        if re.fullmatch(chunker_pattern, chunker) is None:
189            raise ValueError("Invalid chunker specification")
190        self.chunker: str = chunker
191
192        self.hasher: str = hasher
193        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
194
195        if rpc_base_url is None:
196            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
197        if gateway_base_url is None:
198            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
199
200        if "/ipfs/" in gateway_base_url:
201            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
202
203        # Standard gateway URL construction with proper path handling
204        if gateway_base_url.endswith("/"):
205            gateway_base_url = f"{gateway_base_url}ipfs/"
206        else:
207            gateway_base_url = f"{gateway_base_url}/ipfs/"
208
209        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false"
210        """@private"""
211        self.gateway_base_url: str = gateway_base_url
212        """@private"""
213
214        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
215
216        if client is not None:
217            # user supplied → bind it to *their* current loop
218            self._client_per_loop[asyncio.get_running_loop()] = client
219            self._owns_client: bool = False
220        else:
221            self._owns_client = True  # we'll create clients lazily
222
223        # store for later use by _loop_client()
224        self._default_headers = headers
225        self._default_auth = auth
226
227        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
228        self._closed: bool = False
229
230    # --------------------------------------------------------------------- #
231    # helper: get or create the client bound to the current running loop    #
232    # --------------------------------------------------------------------- #
233    def _loop_client(self) -> httpx.AsyncClient:
234        """Get or create a client for the current event loop."""
235        loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
236        try:
237            return self._client_per_loop[loop]
238        except KeyError:
239            # Create a new client
240            client = httpx.AsyncClient(
241                timeout=60.0,
242                headers=self._default_headers,
243                auth=self._default_auth,
244                limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
245                # Uncomment when they finally support Robost HTTP/2 GOAWAY responses
246                # http2=True,
247            )
248            self._client_per_loop[loop] = client
249            return client
250
251    # --------------------------------------------------------------------- #
252    # graceful shutdown: close **all** clients we own                       #
253    # --------------------------------------------------------------------- #
254    async def aclose(self) -> None:
255        """Close all internally-created clients."""
256        if not self._owns_client:
257            # User supplied the client; they are responsible for closing it.
258            return
259
260        for client in list(self._client_per_loop.values()):
261            if not client.is_closed:
262                try:
263                    await client.aclose()
264                except Exception:
265                    # Best-effort cleanup; ignore errors during shutdown
266                    pass
267
268        self._client_per_loop.clear()
269        self._closed = True
270
271    # At this point, _client_per_loop should be empty or only contain
272    # clients from loops we haven't seen (which shouldn't happen in practice)
273    async def __aenter__(self) -> "KuboCAS":
274        return self
275
276    async def __aexit__(self, *exc: Any) -> None:
277        await self.aclose()
278
279    def __del__(self) -> None:
280        """Best-effort close for internally-created clients."""
281        if not self._owns_client or self._closed:
282            return
283
284        # Attempt proper cleanup if possible
285        try:
286            loop = asyncio.get_running_loop()
287        except RuntimeError:
288            loop = None
289
290        try:
291            if loop is None or not loop.is_running():
292                asyncio.run(self.aclose())
293            else:
294                loop.create_task(self.aclose())
295        except Exception:
296            # Suppress all errors during interpreter shutdown or loop teardown
297            pass
298
299    # --------------------------------------------------------------------- #
300    # save() – now uses the per-loop client                                 #
301    # --------------------------------------------------------------------- #
302    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
303        async with self._sem:  # throttle RPC
304            # Create multipart form data
305            files = {"file": data}
306
307            # Send the POST request
308            client = self._loop_client()
309            response = await client.post(self.rpc_url, files=files)
310            response.raise_for_status()
311            cid_str: str = response.json()["Hash"]
312
313        cid: CID = CID.decode(cid_str)
314        if cid.codec.code != self.DAG_PB_MARKER:
315            cid = cid.set(codec=codec)
316        return cid
317
318    async def load(self, id: IPLDKind) -> bytes:
319        """@private"""
320        cid = cast(CID, id)  # CID is definitely in the IPLDKind type
321        url: str = f"{self.gateway_base_url + str(cid)}"
322
323        async with self._sem:  # throttle gateway
324            client = self._loop_client()
325            response = await client.get(url)
326            response.raise_for_status()
327            return response.content

Connects to an IPFS Kubo daemon.

The IDs in save and load are IPLD CIDs.

  • save() → RPC (/api/v0/add)
  • load() → HTTP gateway (/ipfs/{cid})

save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.

Authentication / custom headers

You have two options:

  1. Bring your own httpx.AsyncClient Pass it via client=... — any default headers or auth configured on that client are reused for every request.
  2. Let KuboCAS build the client but pass headers= and/or auth= kwargs; they are forwarded to the internally–created AsyncClient.
import httpx
from py_hamt import KuboCAS

# Option 1: user-supplied client
client = httpx.AsyncClient(
    headers={"Authorization": "Bearer <token>"},
    auth=("user", "pass"),
)
cas = KuboCAS(client=client)

# Option 2: let KuboCAS create the client
cas = KuboCAS(
    headers={"X-My-Header": "yes"},
    auth=("user", "pass"),
)

Parameters

  • hasher (str): multihash name (defaults to blake3).
  • client (httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily.
  • headers (dict[str, str] | None): default headers for the internally-created client.
  • auth (tuple[str, str] | None): authentication tuple (username, password) for the internally-created client.
  • rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
  • chunker (str): chunking algorithm specification for Kubo's add RPC. Accepted formats are "size-<positive int>", "rabin", or "rabin-<min>-<avg>-<max>".

...

KuboCAS( hasher: str = 'blake3', client: httpx.AsyncClient | None = None, rpc_base_url: str | None = None, gateway_base_url: str | None = None, concurrency: int = 32, *, headers: dict[str, str] | None = None, auth: Optional[Tuple[str, str]] = None, chunker: str = 'size-1048576')
139    def __init__(
140        self,
141        hasher: str = "blake3",
142        client: httpx.AsyncClient | None = None,
143        rpc_base_url: str | None = None,
144        gateway_base_url: str | None = None,
145        concurrency: int = 32,
146        *,
147        headers: dict[str, str] | None = None,
148        auth: Tuple[str, str] | None = None,
149        chunker: str = "size-1048576",
150    ):
151        """
152        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
153
154        ### `httpx.AsyncClient` Management
155        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
156        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
157
158        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
159        ```python
160        async with httpx.AsyncClient() as client, KuboCAS(
161            rpc_base_url=rpc_base_url,
162            gateway_base_url=gateway_base_url,
163            client=client,
164        ) as kubo_cas:
165            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
166            zhs = ZarrHAMTStore(hamt)
167            # Use the KuboCAS instance as needed
168            # ...
169        ```
170        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
171        ``` python
172        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
173        # Use the KuboCAS instance as needed
174        # ...
175        await cas.aclose()  # Ensure resources are cleaned up
176        ```
177
178        ### Authenticated RPC/Gateway Access
179        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
180        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
181        If you do not need authentication, you can leave these parameters as `None`.
182
183        ### RPC and HTTP Gateway Base URLs
184        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
185        """
186
187        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
188        if re.fullmatch(chunker_pattern, chunker) is None:
189            raise ValueError("Invalid chunker specification")
190        self.chunker: str = chunker
191
192        self.hasher: str = hasher
193        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
194
195        if rpc_base_url is None:
196            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
197        if gateway_base_url is None:
198            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
199
200        if "/ipfs/" in gateway_base_url:
201            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
202
203        # Standard gateway URL construction with proper path handling
204        if gateway_base_url.endswith("/"):
205            gateway_base_url = f"{gateway_base_url}ipfs/"
206        else:
207            gateway_base_url = f"{gateway_base_url}/ipfs/"
208
209        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false"
210        """@private"""
211        self.gateway_base_url: str = gateway_base_url
212        """@private"""
213
214        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
215
216        if client is not None:
217            # user supplied → bind it to *their* current loop
218            self._client_per_loop[asyncio.get_running_loop()] = client
219            self._owns_client: bool = False
220        else:
221            self._owns_client = True  # we'll create clients lazily
222
223        # store for later use by _loop_client()
224        self._default_headers = headers
225        self._default_auth = auth
226
227        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
228        self._closed: bool = False

If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.

httpx.AsyncClient Management

If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose() as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.

If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:

async with httpx.AsyncClient() as client, KuboCAS(
    rpc_base_url=rpc_base_url,
    gateway_base_url=gateway_base_url,
    client=client,
) as kubo_cas:
    hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
    zhs = ZarrHAMTStore(hamt)
    # Use the KuboCAS instance as needed
    # ...

As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.

cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose()  # Ensure resources are cleaned up

Authenticated RPC/Gateway Access

Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in. Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided. If you do not need authentication, you can leave these parameters as None.

RPC and HTTP Gateway Base URLs

These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.

KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = 'http://127.0.0.1:8080'
KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = 'http://127.0.0.1:5001'
chunker: str
hasher: str

The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.

async def aclose(self) -> None:
254    async def aclose(self) -> None:
255        """Close all internally-created clients."""
256        if not self._owns_client:
257            # User supplied the client; they are responsible for closing it.
258            return
259
260        for client in list(self._client_per_loop.values()):
261            if not client.is_closed:
262                try:
263                    await client.aclose()
264                except Exception:
265                    # Best-effort cleanup; ignore errors during shutdown
266                    pass
267
268        self._client_per_loop.clear()
269        self._closed = True

Close all internally-created clients.

async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> multiformats.cid.CID:
302    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
303        async with self._sem:  # throttle RPC
304            # Create multipart form data
305            files = {"file": data}
306
307            # Send the POST request
308            client = self._loop_client()
309            response = await client.post(self.rpc_url, files=files)
310            response.raise_for_status()
311            cid_str: str = response.json()["Hash"]
312
313        cid: CID = CID.decode(cid_str)
314        if cid.codec.code != self.DAG_PB_MARKER:
315            cid = cid.set(codec=codec)
316        return cid

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

class ZarrHAMTStore(zarr.abc.store.Store):
 12class ZarrHAMTStore(zarr.abc.store.Store):
 13    """
 14    Write and read Zarr v3s with a HAMT.
 15
 16    Read **or** write a Zarr-v3 store whose key/value pairs live inside a
 17    py-hamt mapping.
 18
 19    Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and
 20    the value is the raw byte payload produced by Zarr.  No additional
 21    framing, compression, or encryption is applied by this class. For a zarr encryption example
 22    see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23    For a fully encrypted zarr store, where metadata is not available, please see
 24    :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it.
 25
 26    #### A note about using the same `ZarrHAMTStore` for writing and then reading again
 27    If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
 28
 29    #### Sample Code
 30    ```python
 31    # --- Write ---
 32    ds: xarray.Dataset = # ...
 33    cas: ContentAddressedStore = # ...
 34    hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
 35    hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
 36    zhs  = ZarrHAMTStore(hamt, read_only=False)
 37    ds.to_zarr(store=zhs, mode="w", zarr_format=3)
 38    await hamt.make_read_only() # flush + freeze
 39    root_node_id = hamt.root_node_id
 40    print(root_node_id)
 41
 42     # --- read ---
 43    hamt_ro = await HAMT.build(
 44        cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
 45    )
 46    zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
 47    ds_ro = xarray.open_zarr(store=zhs_ro)
 48
 49
 50    print(ds_ro)
 51    xarray.testing.assert_identical(ds, ds_ro)
 52    ```
 53    """
 54
 55    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
 56        """
 57        ### `hamt` and `read_only`
 58        You need to make sure the following two things are true:
 59
 60        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
 61        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
 62
 63        ##### A note about the zarr chunk separator, "/" vs "."
 64        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
 65
 66        #### Metadata Read Cache
 67        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
 68        """
 69        super().__init__(read_only=read_only)
 70
 71        assert hamt.read_only == read_only
 72        assert hamt.values_are_bytes
 73        self.hamt: HAMT = hamt
 74        """
 75        The internal HAMT.
 76        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
 77        """
 78
 79        self.metadata_read_cache: dict[str, bytes] = {}
 80        """@private"""
 81
 82    @property
 83    def read_only(self) -> bool:
 84        """@private"""
 85        return self.hamt.read_only
 86
 87    def __eq__(self, other: object) -> bool:
 88        """@private"""
 89        if not isinstance(other, ZarrHAMTStore):
 90            return False
 91        return self.hamt.root_node_id == other.hamt.root_node_id
 92
 93    async def get(
 94        self,
 95        key: str,
 96        prototype: zarr.core.buffer.BufferPrototype,
 97        byte_range: zarr.abc.store.ByteRequest | None = None,
 98    ) -> zarr.core.buffer.Buffer | None:
 99        """@private"""
100        try:
101            val: bytes
102            # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will
103            is_metadata: bool = (
104                len(key) >= 9 and key[-9:] == "zarr.json"
105            )  # if path ends with zarr.json
106
107            if is_metadata and key in self.metadata_read_cache:
108                val = self.metadata_read_cache[key]
109            else:
110                val = cast(
111                    bytes, await self.hamt.get(key)
112                )  # We know values received will always be bytes since we only store bytes in the HAMT
113                if is_metadata:
114                    self.metadata_read_cache[key] = val
115
116            return prototype.buffer.from_bytes(val)
117        except KeyError:
118            # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases
119            return None
120
121    async def get_partial_values(
122        self,
123        prototype: zarr.core.buffer.BufferPrototype,
124        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
125    ) -> list[zarr.core.buffer.Buffer | None]:
126        """@private"""
127        raise NotImplementedError
128
129    async def exists(self, key: str) -> bool:
130        """@private"""
131        try:
132            await self.hamt.get(key)
133            return True
134        except KeyError:
135            return False
136
137    @property
138    def supports_writes(self) -> bool:
139        """@private"""
140        return not self.hamt.read_only
141
142    @property
143    def supports_partial_writes(self) -> bool:
144        """@private"""
145        return False
146
147    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
148        """@private"""
149        if key in self.metadata_read_cache:
150            self.metadata_read_cache[key] = value.to_bytes()
151        await self.hamt.set(key, value.to_bytes())
152
153    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
154        """@private"""
155        if not (await self.exists(key)):
156            await self.set(key, value)
157
158    async def set_partial_values(
159        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
160    ) -> None:
161        """@private"""
162        raise NotImplementedError
163
164    @property
165    def supports_deletes(self) -> bool:
166        """@private"""
167        return not self.hamt.read_only
168
169    async def delete(self, key: str) -> None:
170        """@private"""
171        try:
172            await self.hamt.delete(key)
173            # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo
174            # if key in self.metadata_read_cache:
175            #     del self.metadata_read_cache[key]
176        # It's fine if the key was not in the HAMT
177        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues
178        except KeyError:
179            return
180
181    @property
182    def supports_listing(self) -> bool:
183        """@private"""
184        return True
185
186    async def list(self) -> AsyncIterator[str]:
187        """@private"""
188        async for key in self.hamt.keys():
189            yield key
190
191    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
192        """@private"""
193        async for key in self.hamt.keys():
194            if key.startswith(prefix):
195                yield key
196
197    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
198        """
199        @private
200        List *immediate* children that live directly under **prefix**.
201
202        This is similar to :py:meth:`list_prefix` but collapses everything
203        below the first ``"/"`` after *prefix*.  Each child name is yielded
204        **exactly once** in the order of first appearance while scanning the
205        HAMT keys.
206
207        Parameters
208        ----------
209        prefix : str
210            Logical directory path.  *Must* end with ``"/"`` for the result to
211            make sense (e.g. ``"a/b/"``).
212
213        Yields
214        ------
215        str
216            The name of each direct child (file or sub-directory) of *prefix*.
217
218        Examples
219        --------
220        With keys ::
221
222            a/b/c/d
223            a/b/c/e
224            a/b/f
225            a/b/g/h/i
226
227        ``await list_dir("a/b/")`` produces ::
228
229            c
230            f
231            g
232
233        Notes
234        -----
235        • Internally uses a :class:`set` to deduplicate names; memory grows
236            with the number of *unique* children, not the total number of keys.
237        • Order is **not** sorted; it reflects the first encounter while
238            iterating over :py:meth:`HAMT.keys`.
239        """
240        seen_names: set[str] = set()
241        async for key in self.hamt.keys():
242            if key.startswith(prefix):
243                suffix: str = key[len(prefix) :]
244                first_slash: int = suffix.find("/")
245                if first_slash == -1:
246                    if suffix not in seen_names:
247                        seen_names.add(suffix)
248                        yield suffix
249                else:
250                    name: str = suffix[0:first_slash]
251                    if name not in seen_names:
252                        seen_names.add(name)
253                        yield name

Write and read Zarr v3s with a HAMT.

Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.

Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and the value is the raw byte payload produced by Zarr. No additional framing, compression, or encryption is applied by this class. For a zarr encryption example see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb For a fully encrypted zarr store, where metadata is not available, please see SimpleEncryptedZarrHAMTStore but we do not recommend using it.

A note about using the same ZarrHAMTStore for writing and then reading again

If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.

Sample Code

# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
zhs  = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)

 # --- read ---
hamt_ro = await HAMT.build(
    cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)


print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
ZarrHAMTStore(hamt: HAMT, read_only: bool = False)
55    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
56        """
57        ### `hamt` and `read_only`
58        You need to make sure the following two things are true:
59
60        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
61        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
62
63        ##### A note about the zarr chunk separator, "/" vs "."
64        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
65
66        #### Metadata Read Cache
67        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
68        """
69        super().__init__(read_only=read_only)
70
71        assert hamt.read_only == read_only
72        assert hamt.values_are_bytes
73        self.hamt: HAMT = hamt
74        """
75        The internal HAMT.
76        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
77        """
78
79        self.metadata_read_cache: dict[str, bytes] = {}
80        """@private"""

hamt and read_only

You need to make sure the following two things are true:

  1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but __init__ cannot be async.
  2. The HAMT has hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."

While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.

Metadata Read Cache

ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.

hamt: HAMT

The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.

class SimpleEncryptedZarrHAMTStore(py_hamt.ZarrHAMTStore):
 13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore):
 14    """
 15    Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy.
 16
 17    This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
 18    stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
 19    and data chunks. This provides strong privacy but means the Zarr store is
 20    completely opaque and unusable without the correct encryption key and header.
 21
 22    Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23
 24    #### Encryption Details
 25    - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
 26    - Requires a 32-byte encryption key and a header.
 27    - Each encrypted value includes a 24-byte nonce and a 16-byte tag.
 28
 29    #### Important Considerations
 30    - Since metadata is encrypted, standard Zarr tools cannot inspect the
 31      dataset without prior decryption using this store class.
 32    - There is no support for partial encryption or excluding variables.
 33    - There is no metadata caching.
 34
 35    #### Sample Code
 36    ```python
 37    import xarray
 38    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
 39    from Crypto.Random import get_random_bytes
 40    import numpy as np
 41
 42    # Setup
 43    ds = xarray.Dataset(
 44        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
 45        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
 46    )
 47    cas = KuboCAS() # Example ContentAddressedStore
 48    encryption_key = get_random_bytes(32)
 49    header = b"fully-encrypted-zarr"
 50
 51    # --- Write ---
 52    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
 53    ezhs_write = SimpleEncryptedZarrHAMTStore(
 54        hamt_write, False, encryption_key, header
 55    )
 56    print("Writing fully encrypted Zarr...")
 57    ds.to_zarr(store=ezhs_write, mode="w")
 58    await hamt_write.make_read_only()
 59    root_node_id = hamt_write.root_node_id
 60    print(f"Wrote Zarr with root: {root_node_id}")
 61
 62    # --- Read ---
 63    hamt_read = await HAMT.build(
 64            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
 65        )
 66    ezhs_read = SimpleEncryptedZarrHAMTStore(
 67        hamt_read, True, encryption_key, header
 68    )
 69    print("\nReading fully encrypted Zarr...")
 70    ds_read = xarray.open_zarr(store=ezhs_read)
 71    print("Read back dataset:")
 72    print(ds_read)
 73    xarray.testing.assert_identical(ds, ds_read)
 74    print("Read successful and data verified.")
 75
 76    # --- Read with wrong key (demonstrates failure) ---
 77    wrong_key = get_random_bytes(32)
 78    hamt_bad = await HAMT.build(
 79        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
 80    )
 81    ezhs_bad = SimpleEncryptedZarrHAMTStore(
 82        hamt_bad, True, wrong_key, header
 83    )
 84    print("\nAttempting to read with wrong key...")
 85    try:
 86        ds_bad = xarray.open_zarr(store=ezhs_bad)
 87        print(ds_bad)
 88    except Exception as e:
 89        print(f"Failed to read as expected: {type(e).__name__} - {e}")
 90    ```
 91    """
 92
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}
113
114    def _encrypt(self, val: bytes) -> bytes:
115        """Encrypts data using ChaCha20-Poly1305."""
116        nonce = get_random_bytes(24)  # XChaCha20 uses a 24-byte nonce
117        cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
118        cipher.update(self.header)
119        ciphertext, tag = cipher.encrypt_and_digest(val)
120        return nonce + tag + ciphertext
121
122    def _decrypt(self, val: bytes) -> bytes:
123        """Decrypts data using ChaCha20-Poly1305."""
124        try:
125            # Extract nonce (24), tag (16), and ciphertext
126            nonce, tag, ciphertext = val[:24], val[24:40], val[40:]
127            cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
128            cipher.update(self.header)
129            plaintext = cipher.decrypt_and_verify(ciphertext, tag)
130            return plaintext
131        except Exception as e:
132            # Catching a broad exception as various issues (key, tag, length) can occur.
133            raise ValueError(
134                "Decryption failed. Check key, header, or data integrity."
135            ) from e
136
137    def __eq__(self, other: object) -> bool:
138        """@private"""
139        if not isinstance(other, SimpleEncryptedZarrHAMTStore):
140            return False
141        return (
142            self.hamt.root_node_id == other.hamt.root_node_id
143            and self.encryption_key == other.encryption_key
144            and self.header == other.header
145        )
146
147    async def get(
148        self,
149        key: str,
150        prototype: zarr.core.buffer.BufferPrototype,
151        byte_range: zarr.abc.store.ByteRequest | None = None,
152    ) -> zarr.core.buffer.Buffer | None:
153        """@private"""
154        try:
155            decrypted_val: bytes
156            is_metadata: bool = (
157                len(key) >= 9 and key[-9:] == "zarr.json"
158            )  # if path ends with zarr.json
159
160            if is_metadata and key in self.metadata_read_cache:
161                decrypted_val = self.metadata_read_cache[key]
162            else:
163                raw_val = cast(
164                    bytes, await self.hamt.get(key)
165                )  # We know values received will always be bytes since we only store bytes in the HAMT
166                decrypted_val = self._decrypt(raw_val)
167                if is_metadata:
168                    self.metadata_read_cache[key] = decrypted_val
169            return prototype.buffer.from_bytes(decrypted_val)
170        except KeyError:
171            return None
172
173    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
174        """@private"""
175        if self.read_only:
176            raise Exception("Cannot write to a read-only store.")
177
178        raw_bytes = value.to_bytes()
179        if key in self.metadata_read_cache:
180            self.metadata_read_cache[key] = raw_bytes
181        # Encrypt it
182        encrypted_bytes = self._encrypt(raw_bytes)
183        await self.hamt.set(key, encrypted_bytes)

Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.

This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.

Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb

#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.

#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
  dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.

#### Sample Code


    import xarray
    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
    from Crypto.Random import get_random_bytes
    import numpy as np

    # Setup
    ds = xarray.Dataset(
        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
    )
    cas = KuboCAS() # Example ContentAddressedStore
    encryption_key = get_random_bytes(32)
    header = b"fully-encrypted-zarr"

    # --- Write ---
    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
    ezhs_write = SimpleEncryptedZarrHAMTStore(
        hamt_write, False, encryption_key, header
    )
    print("Writing fully encrypted Zarr...")
    ds.to_zarr(store=ezhs_write, mode="w")
    await hamt_write.make_read_only()
    root_node_id = hamt_write.root_node_id
    print(f"Wrote Zarr with root: {root_node_id}")

    # --- Read ---
    hamt_read = await HAMT.build(
            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
        )
    ezhs_read = SimpleEncryptedZarrHAMTStore(
        hamt_read, True, encryption_key, header
    )
    print("
Reading fully encrypted Zarr...")
    ds_read = xarray.open_zarr(store=ezhs_read)
    print("Read back dataset:")
    print(ds_read)
    xarray.testing.assert_identical(ds, ds_read)
    print("Read successful and data verified.")

    # --- Read with wrong key (demonstrates failure) ---
    wrong_key = get_random_bytes(32)
    hamt_bad = await HAMT.build(
        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
    )
    ezhs_bad = SimpleEncryptedZarrHAMTStore(
        hamt_bad, True, wrong_key, header
    )
    print("
Attempting to read with wrong key...")
    try:
        ds_bad = xarray.open_zarr(store=ezhs_bad)
        print(ds_bad)
    except Exception as e:
        print(f"Failed to read as expected: {type(e).__name__} - {e}")
SimpleEncryptedZarrHAMTStore( hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes)
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}

Initializes the SimpleEncryptedZarrHAMTStore.

Args: hamt: The HAMT instance for storage. Must have values_are_bytes=True. Its read_only status must match the read_only argument. read_only: If True, the store is in read-only mode. encryption_key: A 32-byte key for ChaCha20-Poly1305. header: A header (bytes) used as associated data in encryption.

encryption_key
header