py_hamt

 1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
 2from .hamt import HAMT, blake3_hashfn
 3from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli
 4from .sharded_zarr_store import ShardedZarrStore
 5from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
 6from .zarr_hamt_store import ZarrHAMTStore
 7
 8__all__ = [
 9    "blake3_hashfn",
10    "HAMT",
11    "ContentAddressedStore",
12    "InMemoryCAS",
13    "KuboCAS",
14    "ZarrHAMTStore",
15    "SimpleEncryptedZarrHAMTStore",
16    "ShardedZarrStore",
17    "convert_hamt_to_sharded",
18    "sharded_converter_cli",
19]
def blake3_hashfn(input_bytes: bytes) -> bytes:
55def blake3_hashfn(input_bytes: bytes) -> bytes:
56    """
57    This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size.
58
59    """
60    # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify
61    digest: bytes = b3.digest(input_bytes, size=32)
62    raw_bytes: bytes = b3.unwrap(digest)
63    return raw_bytes

This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.

class HAMT:
303class HAMT:
304    """
305    An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
306
307    Use this to store arbitrarily large key-value mappings in your CAS of choice.
308
309    For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
310
311    When in read-only mode, the HAMT is both async and thread safe.
312
313    #### A note about memory management, read+write and read-only modes
314    The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
315
316    Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`.
317
318    These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits.
319
320    #### IPFS HAMT Sample Code
321    ```python
322    kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
323    hamt = await HAMT.build(cas=kubo_cas)
324    await hamt.set("foo", "bar")
325    assert (await hamt.get("foo")) == "bar"
326    await hamt.make_read_only()
327    cid = hamt.root_node_id # our root node CID
328    print(cid)
329    ```
330    """
331
332    def __init__(
333        self,
334        cas: ContentAddressedStore,
335        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
336        root_node_id: IPLDKind | None = None,
337        read_only: bool = False,
338        max_bucket_size: int = 4,
339        values_are_bytes: bool = False,
340    ):
341        """
342        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
343        """
344
345        self.cas: ContentAddressedStore = cas
346        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
347
348        self.hash_fn: Callable[[bytes], bytes] = hash_fn
349        """
350        This is the hash function used to place a key-value within the HAMT.
351
352        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
353
354        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
355
356        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
357        """
358
359        self.lock: asyncio.Lock = asyncio.Lock()
360        """@private"""
361
362        self.values_are_bytes: bool = values_are_bytes
363        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
364
365        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
366        """
367
368        if max_bucket_size < 1:
369            raise ValueError("Bucket size maximum must be a positive integer")
370        self.max_bucket_size: int = max_bucket_size
371        """
372        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
373
374        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
375
376        This must be a positive integer with a minimum of 1.
377        """
378
379        self.root_node_id: IPLDKind = root_node_id
380        """
381        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
382
383        Read from this only when in read mode to get something valid!
384        """
385
386        self.read_only: bool = read_only
387        """Clients should NOT modify this.
388
389        This is here for checking whether the HAMT is in read only or read/write mode.
390
391        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
392        """
393        self.node_store: NodeStore
394        """@private"""
395        if read_only:
396            self.node_store = ReadCacheStore(self)
397        else:
398            self.node_store = InMemoryTreeStore(self)
399
400    @classmethod
401    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
402        """
403        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
404
405        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
406        """
407        hamt = cls(*args, **kwargs)
408        if hamt.root_node_id is None:
409            hamt.root_node_id = await hamt.node_store.save(None, Node())
410        return hamt
411
412    # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async
413    async def make_read_only(self) -> None:
414        """
415        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
416
417        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
418        """
419        async with self.lock:
420            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
421            await inmemory_tree.vacate()
422
423            self.read_only = True
424            self.node_store = ReadCacheStore(self)
425
426    async def enable_write(self) -> None:
427        """
428        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
429        """
430        async with self.lock:
431            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
432            self.read_only = False
433            self.node_store = InMemoryTreeStore(self)
434
435    async def cache_size(self) -> int:
436        """
437        Returns the memory used by some internal performance optimization tools in bytes.
438
439        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
440
441        Be warned that this may take a while to run for large HAMTs.
442
443        For more on memory management, see the `HAMT` class documentation.
444        """
445        if self.read_only:
446            return self.node_store.size()
447        async with self.lock:
448            return self.node_store.size()
449
450    async def cache_vacate(self) -> None:
451        """
452        Vacate and completely empty out the internal read/write cache.
453
454        Be warned that this may take a while if there have been a lot of write operations.
455
456        For more on memory management, see the `HAMT` class documentation.
457        """
458        if self.read_only:
459            await self.node_store.vacate()
460        else:
461            async with self.lock:
462                await self.node_store.vacate()
463
464    async def _reserialize_and_link(
465        self, node_stack: list[tuple[IPLDKind, Node]]
466    ) -> None:
467        """
468        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
469        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
470        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
471        If a node ends up being empty, then it is deleted entirely, unless it is the root node
472        Modifies in place
473        """
474        # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root
475        for stack_index in range(len(node_stack) - 1, -1, -1):
476            old_id, node = node_stack[stack_index]
477
478            # If this node is empty, and it's not the root node, then we can delete it entirely from the list
479            is_root: bool = stack_index == 0
480            if node.is_empty() and not is_root:
481                # Unlink from the rest of the tree
482                _, prev_node = node_stack[stack_index - 1]
483                # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation
484                for link_index in prev_node.iter_link_indices():
485                    link = prev_node.get_link(link_index)
486                    if link == old_id:
487                        # Delete the link by making it an empty bucket
488                        prev_node.data[link_index] = {}
489                        break
490
491                # Remove from our stack, continue reserializing up the tree
492                node_stack.pop(stack_index)
493                continue
494
495            # If not an empty node, just reserialize like normal and replace this one
496            new_store_id: IPLDKind = await self.node_store.save(old_id, node)
497            node_stack[stack_index] = (new_store_id, node)
498
499            # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized
500            if not is_root:
501                _, prev_node = node_stack[stack_index - 1]
502                prev_node.replace_link(old_id, new_store_id)
503
504    # automatically skip encoding if the value provided is of the bytes variety
505    async def set(self, key: str, val: IPLDKind) -> None:
506        """Write a key-value mapping."""
507        if self.read_only:
508            raise Exception("Cannot call set on a read only HAMT")
509
510        data: bytes
511        if self.values_are_bytes:
512            data = cast(
513                bytes, val
514            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
515        else:
516            data = dag_cbor.encode(val)
517
518        pointer: IPLDKind = await self.cas.save(data, codec="raw")
519        await self._set_pointer(key, pointer)
520
521    async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None:
522        async with self.lock:
523            node_stack: list[tuple[IPLDKind, Node]] = []
524            root_node: Node = await self.node_store.load(self.root_node_id)
525            node_stack.append((self.root_node_id, root_node))
526
527            # FIFO queue to keep track of all the KVs we need to insert
528            # This is needed if any buckets overflow and so we need to reinsert all those KVs
529            kvs_queue: list[tuple[str, IPLDKind]] = []
530            kvs_queue.append((key, val_ptr))
531
532            while len(kvs_queue) > 0:
533                _, top_node = node_stack[-1]
534                curr_key, curr_val_ptr = kvs_queue[0]
535
536                raw_hash: bytes = self.hash_fn(curr_key.encode())
537                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
538
539                item = top_node.data[map_key]
540                if isinstance(item, list):
541                    next_node_id: IPLDKind = item[0]
542                    next_node: Node = await self.node_store.load(next_node_id)
543                    node_stack.append((next_node_id, next_node))
544                elif isinstance(item, dict):
545                    bucket: dict[str, IPLDKind] = item
546
547                    # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue
548                    if curr_key in bucket or len(bucket) < self.max_bucket_size:
549                        bucket[curr_key] = curr_val_ptr
550                        kvs_queue.pop(0)
551                        continue
552
553                    # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion
554                    for k in bucket:
555                        v_ptr = bucket[k]
556                        kvs_queue.append((k, v_ptr))
557
558                    # Create a new link to a new node so that we can reflow these KVs into a new subtree
559                    new_node = Node()
560                    new_node_id: IPLDKind = await self.node_store.save(None, new_node)
561                    link: list[IPLDKind] = [new_node_id]
562                    top_node.data[map_key] = link
563
564            # Finally, reserialize and fix all links, deleting empty nodes as needed
565            await self._reserialize_and_link(node_stack)
566            self.root_node_id = node_stack[0][0]
567
568    async def delete(self, key: str) -> None:
569        """Delete a key-value mapping."""
570
571        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
572        if self.read_only:
573            raise Exception("Cannot call delete on a read only HAMT")
574
575        async with self.lock:
576            raw_hash: bytes = self.hash_fn(key.encode())
577
578            node_stack: list[tuple[IPLDKind, Node]] = []
579            root_node: Node = await self.node_store.load(self.root_node_id)
580            node_stack.append((self.root_node_id, root_node))
581
582            created_change: bool = False
583            while True:
584                _, top_node = node_stack[-1]
585                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
586
587                item = top_node.data[map_key]
588                if isinstance(item, dict):
589                    bucket = item
590                    if key in bucket:
591                        del bucket[key]
592                        created_change = True
593                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
594                    break
595                elif isinstance(item, list):
596                    link: IPLDKind = item[0]
597                    next_node: Node = await self.node_store.load(link)
598                    node_stack.append((link, next_node))
599
600            # Finally, reserialize and fix all links, deleting empty nodes as needed
601            if created_change:
602                await self._reserialize_and_link(node_stack)
603                self.root_node_id = node_stack[0][0]
604            else:
605                # If we didn't make a change, then this key must not exist within the HAMT
606                raise KeyError
607
608    async def get(
609        self,
610        key: str,
611        offset: Optional[int] = None,
612        length: Optional[int] = None,
613        suffix: Optional[int] = None,
614    ) -> IPLDKind:
615        """Get a value."""
616        pointer: IPLDKind = await self.get_pointer(key)
617        data: bytes = await self.cas.load(
618            pointer, offset=offset, length=length, suffix=suffix
619        )
620        if self.values_are_bytes:
621            return data
622        else:
623            return dag_cbor.decode(data)
624
625    async def get_pointer(self, key: str) -> IPLDKind:
626        """
627        Get a store ID that points to the value for this key.
628
629        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
630        """
631        # If read only, no need to acquire a lock
632        pointer: IPLDKind
633        if self.read_only:
634            pointer = await self._get_pointer(key)
635        else:
636            async with self.lock:
637                pointer = await self._get_pointer(key)
638
639        return pointer
640
641    # Callers MUST handle acquiring a lock
642    async def _get_pointer(self, key: str) -> IPLDKind:
643        with instrumentation.span(
644            "py_hamt.hamt.lookup", {"py_hamt.hamt.lookup.key": key}
645        ):
646            lookup_started_at = time.perf_counter()
647            raw_hash: bytes = self.hash_fn(key.encode())
648
649            current_id: IPLDKind = self.root_node_id
650            current_depth: int = 0
651            node_loads = 0
652            node_cache_hits = 0
653
654            # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind
655            result_ptr: IPLDKind = None
656            found_a_result: bool = False
657            try:
658                while True:
659                    top_id: IPLDKind = current_id
660                    if (
661                        isinstance(self.node_store, ReadCacheStore)
662                        and top_id in self.node_store.cache
663                    ):
664                        node_cache_hits += 1
665                    node_loads += 1
666                    top_node: Node = await self.node_store.load(top_id)
667                    map_key: int = extract_bits(raw_hash, current_depth, 8)
668
669                    # Check if this key is in one of the buckets
670                    item = top_node.data[map_key]
671                    if isinstance(item, dict):
672                        bucket = item
673                        if key in bucket:
674                            result_ptr = bucket[key]
675                            found_a_result = True
676                            break
677
678                    if isinstance(item, list):
679                        link: IPLDKind = item[0]
680                        current_id = link
681                        current_depth += 1
682                        continue
683
684                    # Nowhere left to go, stop walking down the tree
685                    break
686
687                if not found_a_result:
688                    raise KeyError
689
690                return result_ptr
691            finally:
692                instrumentation.record_hamt_lookup(
693                    key,
694                    depth=current_depth,
695                    node_loads=node_loads,
696                    node_cache_hits=node_cache_hits,
697                    found=found_a_result,
698                    seconds=time.perf_counter() - lookup_started_at,
699                )
700
701    # Callers MUST handle locking or not on their own
702    async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]:
703        node_id_stack: list[IPLDKind] = [self.root_node_id]
704        while len(node_id_stack) > 0:
705            top_id: IPLDKind = node_id_stack.pop()
706            node: Node = await self.node_store.load(top_id)
707            yield (top_id, node)
708            node_id_stack.extend(list(node.iter_links()))
709
710    async def keys(self) -> AsyncIterator[str]:
711        """
712        AsyncIterator returning all keys in the HAMT.
713
714        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
715
716        When the HAMT is in read only mode however, this can be run concurrently with get operations.
717        """
718        if self.read_only:
719            async for k in self._keys_no_locking():
720                yield k
721        else:
722            async with self.lock:
723                async for k in self._keys_no_locking():
724                    yield k
725
726    async def _keys_no_locking(self) -> AsyncIterator[str]:
727        async for _, node in self._iter_nodes():
728            for bucket in node.iter_buckets():
729                for key in bucket:
730                    yield key
731
732    async def len(self) -> int:
733        """
734        Return the number of key value mappings in this HAMT.
735
736        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
737        """
738        count: int = 0
739        async for _ in self.keys():
740            count += 1
741
742        return count

An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.

Use this to store arbitrarily large key-value mappings in your CAS of choice.

For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.

When in read-only mode, the HAMT is both async and thread safe.

A note about memory management, read+write and read-only modes

The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.

Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.

These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.

IPFS HAMT Sample Code

kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
HAMT( cas: ContentAddressedStore, hash_fn: Callable[[bytes], bytes] = <function blake3_hashfn>, root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]] = None, read_only: bool = False, max_bucket_size: int = 4, values_are_bytes: bool = False)
332    def __init__(
333        self,
334        cas: ContentAddressedStore,
335        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
336        root_node_id: IPLDKind | None = None,
337        read_only: bool = False,
338        max_bucket_size: int = 4,
339        values_are_bytes: bool = False,
340    ):
341        """
342        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
343        """
344
345        self.cas: ContentAddressedStore = cas
346        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
347
348        self.hash_fn: Callable[[bytes], bytes] = hash_fn
349        """
350        This is the hash function used to place a key-value within the HAMT.
351
352        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
353
354        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
355
356        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
357        """
358
359        self.lock: asyncio.Lock = asyncio.Lock()
360        """@private"""
361
362        self.values_are_bytes: bool = values_are_bytes
363        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
364
365        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
366        """
367
368        if max_bucket_size < 1:
369            raise ValueError("Bucket size maximum must be a positive integer")
370        self.max_bucket_size: int = max_bucket_size
371        """
372        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
373
374        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
375
376        This must be a positive integer with a minimum of 1.
377        """
378
379        self.root_node_id: IPLDKind = root_node_id
380        """
381        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
382
383        Read from this only when in read mode to get something valid!
384        """
385
386        self.read_only: bool = read_only
387        """Clients should NOT modify this.
388
389        This is here for checking whether the HAMT is in read only or read/write mode.
390
391        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
392        """
393        self.node_store: NodeStore
394        """@private"""
395        if read_only:
396            self.node_store = ReadCacheStore(self)
397        else:
398            self.node_store = InMemoryTreeStore(self)

Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.

The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.

hash_fn: Callable[[bytes], bytes]

This is the hash function used to place a key-value within the HAMT.

To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.

It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.

Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.

values_are_bytes: bool

Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.

This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.

max_bucket_size: int

This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.

This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.

This must be a positive integer with a minimum of 1.

root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]

This is type IPLDKind but the documentation generator pdoc mangles it a bit.

Read from this only when in read mode to get something valid!

read_only: bool

Clients should NOT modify this.

This is here for checking whether the HAMT is in read only or read/write mode.

The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.

@classmethod
async def build(cls, *args: Any, **kwargs: Any) -> HAMT:
400    @classmethod
401    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
402        """
403        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
404
405        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
406        """
407        hamt = cls(*args, **kwargs)
408        if hamt.root_node_id is None:
409            hamt.root_node_id = await hamt.node_store.save(None, Node())
410        return hamt

Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.

This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.

async def make_read_only(self) -> None:
413    async def make_read_only(self) -> None:
414        """
415        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
416
417        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
418        """
419        async with self.lock:
420            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
421            await inmemory_tree.vacate()
422
423            self.read_only = True
424            self.node_store = ReadCacheStore(self)

Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.

In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.

async def enable_write(self) -> None:
426    async def enable_write(self) -> None:
427        """
428        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
429        """
430        async with self.lock:
431            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
432            self.read_only = False
433            self.node_store = InMemoryTreeStore(self)

Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.

async def cache_size(self) -> int:
435    async def cache_size(self) -> int:
436        """
437        Returns the memory used by some internal performance optimization tools in bytes.
438
439        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
440
441        Be warned that this may take a while to run for large HAMTs.
442
443        For more on memory management, see the `HAMT` class documentation.
444        """
445        if self.read_only:
446            return self.node_store.size()
447        async with self.lock:
448            return self.node_store.size()

Returns the memory used by some internal performance optimization tools in bytes.

This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.

Be warned that this may take a while to run for large HAMTs.

For more on memory management, see the HAMT class documentation.

async def cache_vacate(self) -> None:
450    async def cache_vacate(self) -> None:
451        """
452        Vacate and completely empty out the internal read/write cache.
453
454        Be warned that this may take a while if there have been a lot of write operations.
455
456        For more on memory management, see the `HAMT` class documentation.
457        """
458        if self.read_only:
459            await self.node_store.vacate()
460        else:
461            async with self.lock:
462                await self.node_store.vacate()

Vacate and completely empty out the internal read/write cache.

Be warned that this may take a while if there have been a lot of write operations.

For more on memory management, see the HAMT class documentation.

async def set( self, key: str, val: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> None:
505    async def set(self, key: str, val: IPLDKind) -> None:
506        """Write a key-value mapping."""
507        if self.read_only:
508            raise Exception("Cannot call set on a read only HAMT")
509
510        data: bytes
511        if self.values_are_bytes:
512            data = cast(
513                bytes, val
514            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
515        else:
516            data = dag_cbor.encode(val)
517
518        pointer: IPLDKind = await self.cas.save(data, codec="raw")
519        await self._set_pointer(key, pointer)

Write a key-value mapping.

async def delete(self, key: str) -> None:
568    async def delete(self, key: str) -> None:
569        """Delete a key-value mapping."""
570
571        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
572        if self.read_only:
573            raise Exception("Cannot call delete on a read only HAMT")
574
575        async with self.lock:
576            raw_hash: bytes = self.hash_fn(key.encode())
577
578            node_stack: list[tuple[IPLDKind, Node]] = []
579            root_node: Node = await self.node_store.load(self.root_node_id)
580            node_stack.append((self.root_node_id, root_node))
581
582            created_change: bool = False
583            while True:
584                _, top_node = node_stack[-1]
585                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
586
587                item = top_node.data[map_key]
588                if isinstance(item, dict):
589                    bucket = item
590                    if key in bucket:
591                        del bucket[key]
592                        created_change = True
593                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
594                    break
595                elif isinstance(item, list):
596                    link: IPLDKind = item[0]
597                    next_node: Node = await self.node_store.load(link)
598                    node_stack.append((link, next_node))
599
600            # Finally, reserialize and fix all links, deleting empty nodes as needed
601            if created_change:
602                await self._reserialize_and_link(node_stack)
603                self.root_node_id = node_stack[0][0]
604            else:
605                # If we didn't make a change, then this key must not exist within the HAMT
606                raise KeyError

Delete a key-value mapping.

async def get( self, key: str, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
608    async def get(
609        self,
610        key: str,
611        offset: Optional[int] = None,
612        length: Optional[int] = None,
613        suffix: Optional[int] = None,
614    ) -> IPLDKind:
615        """Get a value."""
616        pointer: IPLDKind = await self.get_pointer(key)
617        data: bytes = await self.cas.load(
618            pointer, offset=offset, length=length, suffix=suffix
619        )
620        if self.values_are_bytes:
621            return data
622        else:
623            return dag_cbor.decode(data)

Get a value.

async def get_pointer( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
625    async def get_pointer(self, key: str) -> IPLDKind:
626        """
627        Get a store ID that points to the value for this key.
628
629        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
630        """
631        # If read only, no need to acquire a lock
632        pointer: IPLDKind
633        if self.read_only:
634            pointer = await self._get_pointer(key)
635        else:
636            async with self.lock:
637                pointer = await self._get_pointer(key)
638
639        return pointer

Get a store ID that points to the value for this key.

This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.

async def keys(self) -> AsyncIterator[str]:
710    async def keys(self) -> AsyncIterator[str]:
711        """
712        AsyncIterator returning all keys in the HAMT.
713
714        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
715
716        When the HAMT is in read only mode however, this can be run concurrently with get operations.
717        """
718        if self.read_only:
719            async for k in self._keys_no_locking():
720                yield k
721        else:
722            async with self.lock:
723                async for k in self._keys_no_locking():
724                    yield k

AsyncIterator returning all keys in the HAMT.

If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.

When the HAMT is in read only mode however, this can be run concurrently with get operations.

async def len(self) -> int:
732    async def len(self) -> int:
733        """
734        Return the number of key value mappings in this HAMT.
735
736        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
737        """
738        count: int = 0
739        async for _ in self.keys():
740            count += 1
741
742        return count

Return the number of key value mappings in this HAMT.

When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.

class ContentAddressedStore(abc.ABC):
16class ContentAddressedStore(ABC):
17    """
18    Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data.
19
20    Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately.
21
22    #### A note on the IPLDKind return types
23    Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
24    1. No lists or dicts, since python does not classify these as immutable.
25    2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized.
26    """
27
28    CodecInput = Literal["raw", "dag-cbor"]
29
30    @abstractmethod
31    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
32        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
33
34        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
35        """
36
37    @abstractmethod
38    async def load(
39        self,
40        id: IPLDKind,
41        offset: Optional[int] = None,
42        length: Optional[int] = None,
43        suffix: Optional[int] = None,
44    ) -> bytes:
45        """Retrieve data."""
46
47    async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None:
48        """Pin a CID in the storage."""
49        pass  # pragma: no cover
50
51    async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None:
52        """Unpin a CID in the storage."""
53        pass  # pragma: no cover
54
55    async def pin_update(
56        self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str
57    ) -> None:
58        """Update the pinned CID in the storage."""
59        pass  # pragma: no cover
60
61    async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]:
62        """List all pinned CIDs in the storage."""
63        return []  # pragma: no cover

Abstract class that represents a content addressed storage that the HAMT can use for keeping data.

Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.

A note on the IPLDKind return types

Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:

  1. No lists or dicts, since python does not classify these as immutable.
  2. No None values since this is used in HAMT's __init__ to indicate that an empty HAMT needs to be initialized.
CodecInput = typing.Literal['raw', 'dag-cbor']
@abstractmethod
async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
30    @abstractmethod
31    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
32        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
33
34        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
35        """

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

@abstractmethod
async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
37    @abstractmethod
38    async def load(
39        self,
40        id: IPLDKind,
41        offset: Optional[int] = None,
42        length: Optional[int] = None,
43        suffix: Optional[int] = None,
44    ) -> bytes:
45        """Retrieve data."""

Retrieve data.

async def pin_cid( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
47    async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None:
48        """Pin a CID in the storage."""
49        pass  # pragma: no cover

Pin a CID in the storage.

async def unpin_cid( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
51    async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None:
52        """Unpin a CID in the storage."""
53        pass  # pragma: no cover

Unpin a CID in the storage.

async def pin_update( self, old_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], new_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
55    async def pin_update(
56        self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str
57    ) -> None:
58        """Update the pinned CID in the storage."""
59        pass  # pragma: no cover

Update the pinned CID in the storage.

async def pin_ls(self, target_rpc: str) -> list[typing.Dict[str, typing.Any]]:
61    async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]:
62        """List all pinned CIDs in the storage."""
63        return []  # pragma: no cover

List all pinned CIDs in the storage.

class InMemoryCAS(py_hamt.ContentAddressedStore):
 66class InMemoryCAS(ContentAddressedStore):
 67    """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in."""
 68
 69    store: dict[bytes, bytes]
 70    hash_alg: Multihash
 71
 72    def __init__(self):
 73        self.store = dict()
 74        self.hash_alg = multihash.get("blake3")
 75
 76    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
 77        hash: bytes = self.hash_alg.digest(data, size=32)
 78        self.store[hash] = data
 79        return hash
 80
 81    async def load(
 82        self,
 83        id: IPLDKind,
 84        offset: Optional[int] = None,
 85        length: Optional[int] = None,
 86        suffix: Optional[int] = None,
 87    ) -> bytes:
 88        """
 89        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
 90        backend we *require* a `bytes` hash; anything else is rejected at run
 91        time. In OO type-checking, a subclass may widen (make more general) argument types,
 92        but it must never narrow them; otherwise callers that expect the base-class contract can break.
 93        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
 94        This is why we use `cast` here, to tell mypy that we know what we are doing.
 95        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
 96        """
 97        key = cast(bytes, id)
 98        if not isinstance(key, (bytes, bytearray)):  # defensive guard
 99            raise TypeError(
100                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
101            )
102        data: bytes
103        try:
104            data = self.store[key]
105        except KeyError as exc:
106            raise KeyError("Object not found in in-memory store") from exc
107
108        if offset is not None:
109            start = offset
110            if length is not None:
111                end = start + length
112                return data[start:end]
113            else:
114                return data[start:]
115        elif suffix is not None:  # If only length is given, assume start from 0
116            return data[-suffix:]
117        else:  # Full load
118            return data

Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.

store: dict[bytes, bytes]
hash_alg: multiformats.multihash.Multihash
async def save(self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> bytes:
76    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
77        hash: bytes = self.hash_alg.digest(data, size=32)
78        self.store[hash] = data
79        return hash

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
 81    async def load(
 82        self,
 83        id: IPLDKind,
 84        offset: Optional[int] = None,
 85        length: Optional[int] = None,
 86        suffix: Optional[int] = None,
 87    ) -> bytes:
 88        """
 89        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
 90        backend we *require* a `bytes` hash; anything else is rejected at run
 91        time. In OO type-checking, a subclass may widen (make more general) argument types,
 92        but it must never narrow them; otherwise callers that expect the base-class contract can break.
 93        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
 94        This is why we use `cast` here, to tell mypy that we know what we are doing.
 95        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
 96        """
 97        key = cast(bytes, id)
 98        if not isinstance(key, (bytes, bytearray)):  # defensive guard
 99            raise TypeError(
100                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
101            )
102        data: bytes
103        try:
104            data = self.store[key]
105        except KeyError as exc:
106            raise KeyError("Object not found in in-memory store") from exc
107
108        if offset is not None:
109            start = offset
110            if length is not None:
111                end = start + length
112                return data[start:end]
113            else:
114                return data[start:]
115        elif suffix is not None:  # If only length is given, assume start from 0
116            return data[-suffix:]
117        else:  # Full load
118            return data

ContentAddressedStore allows any IPLD scalar key. For the in-memory backend we require a bytes hash; anything else is rejected at run time. In OO type-checking, a subclass may widen (make more general) argument types, but it must never narrow them; otherwise callers that expect the base-class contract can break. Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. This is why we use cast here, to tell mypy that we know what we are doing. h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch

class KuboCAS(py_hamt.ContentAddressedStore):
121class KuboCAS(ContentAddressedStore):
122    """
123    Connects to an **IPFS Kubo** daemon.
124
125    The IDs in save and load are IPLD CIDs.
126
127    * **save()**  → RPC  (`/api/v0/add`)
128    * **load()**  → HTTP gateway  (`/ipfs/{cid}`)
129
130    `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
131
132    ### Authentication / custom headers
133    You have two options:
134
135    1. **Bring your own `httpx.AsyncClient`**
136       Pass it via `client=...` — any default headers or auth
137       configured on that client are reused for **every** request.
138    2. **Let `KuboCAS` build the client** but pass
139       `headers=` *and*/or `auth=` kwargs; they are forwarded to the
140       internally–created `AsyncClient`.
141
142    ```python
143    import httpx
144    from py_hamt import KuboCAS
145
146    # Option 1: user-supplied client
147    client = httpx.AsyncClient(
148        headers={"Authorization": "Bearer <token>"},
149        auth=("user", "pass"),
150    )
151    cas = KuboCAS(client=client)
152
153    # Option 2: let KuboCAS create the client
154    cas = KuboCAS(
155        headers={"X-My-Header": "yes"},
156        auth=("user", "pass"),
157    )
158    ```
159
160    ### Parameters
161    - **hasher** (str): multihash name (defaults to *blake3*).
162    - **client** (`httpx.AsyncClient | None`): reuse an existing
163      client; if *None* KuboCAS will create one lazily.
164    - **headers** (dict[str, str] | None): default headers for the
165      internally-created client.
166    - **auth** (`tuple[str, str] | None`): authentication tuple (username, password)
167      for the internally-created client.
168    - **rpc_base_url / gateway_base_url** (str | None): override daemon
169      endpoints (defaults match the local daemon ports).
170    - **chunker** (str): chunking algorithm specification for Kubo's `add`
171      RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or
172      `"rabin-<min>-<avg>-<max>"`.
173
174    ...
175    """
176
177    KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080"
178    KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001"
179
180    DAG_PB_MARKER: int = 0x70
181    """@private"""
182
183    # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon
184    def __init__(
185        self,
186        hasher: str = "blake3",
187        client: httpx.AsyncClient | None = None,
188        rpc_base_url: str | None = None,
189        gateway_base_url: str | None = None,
190        concurrency: int = 32,
191        *,
192        headers: dict[str, str] | None = None,
193        auth: Tuple[str, str] | None = None,
194        pin_on_add: bool = False,
195        chunker: str = "size-1048576",
196        max_retries: int = 3,
197        initial_delay: float = 1.0,
198        backoff_factor: float = 2.0,
199    ):
200        """
201        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
202
203        ### `httpx.AsyncClient` Management
204        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
205        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
206
207        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
208        ```python
209        async with httpx.AsyncClient() as client, KuboCAS(
210            rpc_base_url=rpc_base_url,
211            gateway_base_url=gateway_base_url,
212            client=client,
213        ) as kubo_cas:
214            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
215            zhs = ZarrHAMTStore(hamt)
216            # Use the KuboCAS instance as needed
217            # ...
218        ```
219        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
220        ``` python
221        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
222        # Use the KuboCAS instance as needed
223        # ...
224        await cas.aclose()  # Ensure resources are cleaned up
225        ```
226
227        ### Authenticated RPC/Gateway Access
228        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
229        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
230        If you do not need authentication, you can leave these parameters as `None`.
231
232        ### RPC and HTTP Gateway Base URLs
233        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
234        """
235
236        self._owns_client: bool = False
237        self._closed: bool = True
238        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
239        self._default_headers = headers
240        self._default_auth = auth
241
242        # Now, perform validation that might raise an exception
243        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
244        if re.fullmatch(chunker_pattern, chunker) is None:
245            raise ValueError("Invalid chunker specification")
246        self.chunker: str = chunker
247
248        self.hasher: str = hasher
249        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
250
251        if rpc_base_url is None:
252            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
253        if gateway_base_url is None:
254            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
255
256        if "/ipfs/" in gateway_base_url:
257            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
258
259        # Standard gateway URL construction with proper path handling
260        if gateway_base_url.endswith("/"):
261            gateway_base_url = f"{gateway_base_url}ipfs/"
262        else:
263            gateway_base_url = f"{gateway_base_url}/ipfs/"
264
265        pin_string: str = "true" if pin_on_add else "false"
266        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}"
267        """@private"""
268        self.gateway_base_url: str = gateway_base_url
269        """@private"""
270
271        if client is not None:
272            # A client was supplied by the user. We don't own it.
273            self._owns_client = False
274            self._client_per_loop = {asyncio.get_running_loop(): client}
275        else:
276            # No client supplied. We will own any clients we create.
277            self._owns_client = True
278            self._client_per_loop = {}
279
280        # store for later use by _loop_client()
281        self._default_headers = headers
282        self._default_auth = auth
283
284        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
285        self._closed = False
286
287        # Validate retry parameters
288        if max_retries < 0:
289            raise ValueError("max_retries must be non-negative")
290        if initial_delay <= 0:
291            raise ValueError("initial_delay must be positive")
292        if backoff_factor < 1.0:
293            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
294
295        self.max_retries = max_retries
296        self.initial_delay = initial_delay
297        self.backoff_factor = backoff_factor
298
299    # --------------------------------------------------------------------- #
300    # helper: get or create the client bound to the current running loop    #
301    # --------------------------------------------------------------------- #
302    def _loop_client(self) -> httpx.AsyncClient:
303        """Get or create a client for the current event loop.
304
305        If the instance was previously closed but owns its clients, a fresh
306        client mapping is lazily created on demand.  Users that supplied their
307        own ``httpx.AsyncClient`` still receive an error when the instance has
308        been closed, as we cannot safely recreate their client.
309        """
310        if self._closed:
311            if not self._owns_client:
312                raise RuntimeError("KuboCAS is closed; create a new instance")
313            # We previously closed all internally-owned clients. Reset the
314            # state so that new clients can be created lazily.
315            self._closed = False
316            self._client_per_loop = {}
317
318        loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
319        try:
320            return self._client_per_loop[loop]
321        except KeyError:
322            # Create a new client
323            client = httpx.AsyncClient(
324                timeout=60.0,
325                headers=self._default_headers,
326                auth=self._default_auth,
327                limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
328                # Uncomment when they finally support Robust HTTP/2 GOAWAY responses
329                # http2=True,
330            )
331            self._client_per_loop[loop] = client
332            return client
333
334    # --------------------------------------------------------------------- #
335    # graceful shutdown: close **all** clients we own                       #
336    # --------------------------------------------------------------------- #
337    async def aclose(self) -> None:
338        """
339        Closes all internally-created clients. Must be called from an async context.
340        """
341        if self._owns_client is False:  # external client → caller closes
342            return
343
344        # This method is async, so we can reliably await the async close method.
345        # The complex sync/async logic is handled by __del__.
346        for client in list(self._client_per_loop.values()):
347            if not client.is_closed:
348                try:
349                    await client.aclose()
350                except Exception:
351                    pass  # best-effort cleanup
352
353        self._client_per_loop.clear()
354        self._closed = True
355
356    # At this point, _client_per_loop should be empty or only contain
357    # clients from loops we haven't seen (which shouldn't happen in practice)
358    async def __aenter__(self) -> "KuboCAS":
359        return self
360
361    async def __aexit__(self, *exc: Any) -> None:
362        await self.aclose()
363
364    def __del__(self) -> None:
365        """Best-effort close for internally-created clients."""
366        if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"):
367            return
368
369        if not self._owns_client or self._closed:
370            return
371
372        # Attempt proper cleanup if possible
373        try:
374            loop = asyncio.get_running_loop()
375        except RuntimeError:
376            # No running loop - can't do async cleanup
377            # Just clear the client references synchronously
378            if hasattr(self, "_client_per_loop"):
379                # We can't await client.aclose() without a loop,
380                # so just clear the references
381                self._client_per_loop.clear()
382                self._closed = True
383            return
384
385        # If we get here, we have a running loop
386        try:
387            if loop.is_running():
388                # Schedule cleanup in the existing loop
389                loop.create_task(self.aclose())
390            else:
391                # Loop exists but not running - try asyncio.run
392                coro = self.aclose()  # Create the coroutine
393                try:
394                    asyncio.run(coro)
395                except Exception:
396                    # If asyncio.run fails, we need to close the coroutine properly
397                    coro.close()  # This prevents the RuntimeWarning
398                    raise  # Re-raise to hit the outer except block
399        except Exception:
400            # If all else fails, just clear references
401            if hasattr(self, "_client_per_loop"):
402                self._client_per_loop.clear()
403                self._closed = True
404
405    # --------------------------------------------------------------------- #
406    # save() – now uses the per-loop client                                 #
407    # --------------------------------------------------------------------- #
408    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
409        async with self._sem:
410            files = {"file": data}
411            client = self._loop_client()
412            retry_count = 0
413
414            while retry_count <= self.max_retries:
415                try:
416                    response = await client.post(
417                        self.rpc_url, files=files, timeout=60.0
418                    )
419                    response.raise_for_status()
420                    cid_str: str = response.json()["Hash"]
421                    cid: CID = CID.decode(cid_str)
422                    if cid.codec.code != self.DAG_PB_MARKER:
423                        cid = cid.set(codec=codec)
424                    return cid
425
426                except (httpx.TimeoutException, httpx.RequestError) as e:
427                    retry_count += 1
428                    if retry_count > self.max_retries:
429                        raise httpx.TimeoutException(
430                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
431                            request=e.request
432                            if isinstance(e, httpx.RequestError)
433                            else None,
434                        )
435
436                    # Calculate backoff delay
437                    delay = self.initial_delay * (
438                        self.backoff_factor ** (retry_count - 1)
439                    )
440                    # Add some jitter to prevent thundering herd
441                    jitter = delay * 0.1 * (random.random() - 0.5)
442                    await asyncio.sleep(delay + jitter)
443
444                except httpx.HTTPStatusError:
445                    # Re-raise non-timeout HTTP errors immediately
446                    raise
447        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover
448
449    async def load(
450        self,
451        id: IPLDKind,
452        offset: Optional[int] = None,
453        length: Optional[int] = None,
454        suffix: Optional[int] = None,
455    ) -> bytes:
456        """Load data from a CID using the IPFS gateway with optional Range requests."""
457        cid = cast(CID, id)
458        url: str = f"{self.gateway_base_url + str(cid)}"
459        headers: Dict[str, str] = {}
460
461        # Construct the Range header if required
462        if offset is not None:
463            start = offset
464            if length is not None:
465                # Standard HTTP Range: bytes=start-end (inclusive)
466                end = start + length - 1
467                headers["Range"] = f"bytes={start}-{end}"
468            else:
469                # Standard HTTP Range: bytes=start- (from start to end)
470                headers["Range"] = f"bytes={start}-"
471        elif suffix is not None:
472            # Standard HTTP Range: bytes=-N (last N bytes)
473            headers["Range"] = f"bytes=-{suffix}"
474
475        trace_started_at = instrumentation.begin_cas_load(cid, bool(headers))
476        response_bytes = 0
477        final_status = "ok"
478        final_retry_count = 0
479        try:
480            async with self._sem:  # Throttle gateway
481                client = self._loop_client()
482                retry_count = 0
483
484                while retry_count <= self.max_retries:
485                    try:
486                        response = await client.get(
487                            url, headers=headers or None, timeout=60.0
488                        )
489                        response.raise_for_status()
490                        content = response.content
491                        response_bytes = len(content)
492                        final_retry_count = retry_count
493                        return content
494
495                    except (httpx.TimeoutException, httpx.RequestError) as e:
496                        retry_count += 1
497                        if retry_count > self.max_retries:
498                            final_status = "timeout"
499                            final_retry_count = retry_count
500                            raise httpx.TimeoutException(
501                                f"Failed to load data after {self.max_retries} retries: {str(e)}",
502                                request=e.request
503                                if isinstance(e, httpx.RequestError)
504                                else None,
505                            )
506
507                        # Calculate backoff delay with jitter
508                        delay = self.initial_delay * (
509                            self.backoff_factor ** (retry_count - 1)
510                        )
511                        jitter = delay * 0.1 * (random.random() - 0.5)
512                        await asyncio.sleep(delay + jitter)
513
514                    except httpx.HTTPStatusError:
515                        # Re-raise non-timeout HTTP errors immediately
516                        final_status = "http_error"
517                        final_retry_count = retry_count
518                        raise
519        finally:
520            instrumentation.end_cas_load(
521                trace_started_at,
522                byte_count=response_bytes,
523                retries=final_retry_count,
524                status=final_status,
525            )
526        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover
527
528    # --------------------------------------------------------------------- #
529    # pin_cid() – method to pin a CID                                       #
530    # --------------------------------------------------------------------- #
531    async def pin_cid(
532        self,
533        cid: CID,
534        target_rpc: str = "http://127.0.0.1:5001",
535    ) -> None:
536        """
537        Pins a CID to the local Kubo node via the RPC API.
538
539        This call is recursive by default, pinning all linked objects.
540
541        Args:
542            cid (CID): The Content ID to pin.
543            target_rpc (str): The RPC URL of the Kubo node.
544        """
545        params = {"arg": str(cid), "recursive": "true"}
546        pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add"
547
548        async with self._sem:  # throttle RPC
549            client = self._loop_client()
550            response = await client.post(pin_add_url_base, params=params)
551            response.raise_for_status()
552
553    async def unpin_cid(
554        self, cid: CID, target_rpc: str = "http://127.0.0.1:5001"
555    ) -> None:
556        """
557        Unpins a CID from the local Kubo node via the RPC API.
558
559        Args:
560            cid (CID): The Content ID to unpin.
561        """
562        params = {"arg": str(cid), "recursive": "true"}
563        unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm"
564        async with self._sem:  # throttle RPC
565            client = self._loop_client()
566            response = await client.post(unpin_url_base, params=params)
567            response.raise_for_status()
568
569    async def pin_update(
570        self,
571        old_id: IPLDKind,
572        new_id: IPLDKind,
573        target_rpc: str = "http://127.0.0.1:5001",
574    ) -> None:
575        """
576        Updates the pinned CID in the storage.
577
578        Args:
579            old_id (IPLDKind): The old Content ID to replace.
580            new_id (IPLDKind): The new Content ID to pin.
581        """
582        params = {"arg": [str(old_id), str(new_id)]}
583        pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update"
584        async with self._sem:  # throttle RPC
585            client = self._loop_client()
586            response = await client.post(pin_update_url_base, params=params)
587            response.raise_for_status()
588
589    async def pin_ls(
590        self, target_rpc: str = "http://127.0.0.1:5001"
591    ) -> list[Dict[str, Any]]:
592        """
593        Lists all pinned CIDs on the local Kubo node via the RPC API.
594
595        Args:
596            target_rpc (str): The RPC URL of the Kubo node.
597
598        Returns:
599            List[CID]: A list of pinned CIDs.
600        """
601        pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls"
602        async with self._sem:  # throttle RPC
603            client = self._loop_client()
604            response = await client.post(pin_ls_url_base)
605            response.raise_for_status()
606            pins = response.json().get("Keys", [])
607            return pins

Connects to an IPFS Kubo daemon.

The IDs in save and load are IPLD CIDs.

  • save() → RPC (/api/v0/add)
  • load() → HTTP gateway (/ipfs/{cid})

save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.

Authentication / custom headers

You have two options:

  1. Bring your own httpx.AsyncClient Pass it via client=... — any default headers or auth configured on that client are reused for every request.
  2. Let KuboCAS build the client but pass headers= and/or auth= kwargs; they are forwarded to the internally–created AsyncClient.
import httpx
from py_hamt import KuboCAS

# Option 1: user-supplied client
client = httpx.AsyncClient(
    headers={"Authorization": "Bearer <token>"},
    auth=("user", "pass"),
)
cas = KuboCAS(client=client)

# Option 2: let KuboCAS create the client
cas = KuboCAS(
    headers={"X-My-Header": "yes"},
    auth=("user", "pass"),
)

Parameters

  • hasher (str): multihash name (defaults to blake3).
  • client (httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily.
  • headers (dict[str, str] | None): default headers for the internally-created client.
  • auth (tuple[str, str] | None): authentication tuple (username, password) for the internally-created client.
  • rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
  • chunker (str): chunking algorithm specification for Kubo's add RPC. Accepted formats are "size-<positive int>", "rabin", or "rabin-<min>-<avg>-<max>".

...

KuboCAS( hasher: str = 'blake3', client: httpx.AsyncClient | None = None, rpc_base_url: str | None = None, gateway_base_url: str | None = None, concurrency: int = 32, *, headers: dict[str, str] | None = None, auth: Optional[Tuple[str, str]] = None, pin_on_add: bool = False, chunker: str = 'size-1048576', max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0)
184    def __init__(
185        self,
186        hasher: str = "blake3",
187        client: httpx.AsyncClient | None = None,
188        rpc_base_url: str | None = None,
189        gateway_base_url: str | None = None,
190        concurrency: int = 32,
191        *,
192        headers: dict[str, str] | None = None,
193        auth: Tuple[str, str] | None = None,
194        pin_on_add: bool = False,
195        chunker: str = "size-1048576",
196        max_retries: int = 3,
197        initial_delay: float = 1.0,
198        backoff_factor: float = 2.0,
199    ):
200        """
201        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
202
203        ### `httpx.AsyncClient` Management
204        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
205        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
206
207        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
208        ```python
209        async with httpx.AsyncClient() as client, KuboCAS(
210            rpc_base_url=rpc_base_url,
211            gateway_base_url=gateway_base_url,
212            client=client,
213        ) as kubo_cas:
214            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
215            zhs = ZarrHAMTStore(hamt)
216            # Use the KuboCAS instance as needed
217            # ...
218        ```
219        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
220        ``` python
221        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
222        # Use the KuboCAS instance as needed
223        # ...
224        await cas.aclose()  # Ensure resources are cleaned up
225        ```
226
227        ### Authenticated RPC/Gateway Access
228        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
229        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
230        If you do not need authentication, you can leave these parameters as `None`.
231
232        ### RPC and HTTP Gateway Base URLs
233        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
234        """
235
236        self._owns_client: bool = False
237        self._closed: bool = True
238        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
239        self._default_headers = headers
240        self._default_auth = auth
241
242        # Now, perform validation that might raise an exception
243        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
244        if re.fullmatch(chunker_pattern, chunker) is None:
245            raise ValueError("Invalid chunker specification")
246        self.chunker: str = chunker
247
248        self.hasher: str = hasher
249        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
250
251        if rpc_base_url is None:
252            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
253        if gateway_base_url is None:
254            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
255
256        if "/ipfs/" in gateway_base_url:
257            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
258
259        # Standard gateway URL construction with proper path handling
260        if gateway_base_url.endswith("/"):
261            gateway_base_url = f"{gateway_base_url}ipfs/"
262        else:
263            gateway_base_url = f"{gateway_base_url}/ipfs/"
264
265        pin_string: str = "true" if pin_on_add else "false"
266        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}"
267        """@private"""
268        self.gateway_base_url: str = gateway_base_url
269        """@private"""
270
271        if client is not None:
272            # A client was supplied by the user. We don't own it.
273            self._owns_client = False
274            self._client_per_loop = {asyncio.get_running_loop(): client}
275        else:
276            # No client supplied. We will own any clients we create.
277            self._owns_client = True
278            self._client_per_loop = {}
279
280        # store for later use by _loop_client()
281        self._default_headers = headers
282        self._default_auth = auth
283
284        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
285        self._closed = False
286
287        # Validate retry parameters
288        if max_retries < 0:
289            raise ValueError("max_retries must be non-negative")
290        if initial_delay <= 0:
291            raise ValueError("initial_delay must be positive")
292        if backoff_factor < 1.0:
293            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
294
295        self.max_retries = max_retries
296        self.initial_delay = initial_delay
297        self.backoff_factor = backoff_factor

If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.

httpx.AsyncClient Management

If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose() as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.

If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:

async with httpx.AsyncClient() as client, KuboCAS(
    rpc_base_url=rpc_base_url,
    gateway_base_url=gateway_base_url,
    client=client,
) as kubo_cas:
    hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
    zhs = ZarrHAMTStore(hamt)
    # Use the KuboCAS instance as needed
    # ...

As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.

cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose()  # Ensure resources are cleaned up

Authenticated RPC/Gateway Access

Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in. Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided. If you do not need authentication, you can leave these parameters as None.

RPC and HTTP Gateway Base URLs

These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.

KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = 'http://127.0.0.1:8080'
KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = 'http://127.0.0.1:5001'
chunker: str
hasher: str

The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.

max_retries
initial_delay
backoff_factor
async def aclose(self) -> None:
337    async def aclose(self) -> None:
338        """
339        Closes all internally-created clients. Must be called from an async context.
340        """
341        if self._owns_client is False:  # external client → caller closes
342            return
343
344        # This method is async, so we can reliably await the async close method.
345        # The complex sync/async logic is handled by __del__.
346        for client in list(self._client_per_loop.values()):
347            if not client.is_closed:
348                try:
349                    await client.aclose()
350                except Exception:
351                    pass  # best-effort cleanup
352
353        self._client_per_loop.clear()
354        self._closed = True

Closes all internally-created clients. Must be called from an async context.

async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> multiformats.cid.CID:
408    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
409        async with self._sem:
410            files = {"file": data}
411            client = self._loop_client()
412            retry_count = 0
413
414            while retry_count <= self.max_retries:
415                try:
416                    response = await client.post(
417                        self.rpc_url, files=files, timeout=60.0
418                    )
419                    response.raise_for_status()
420                    cid_str: str = response.json()["Hash"]
421                    cid: CID = CID.decode(cid_str)
422                    if cid.codec.code != self.DAG_PB_MARKER:
423                        cid = cid.set(codec=codec)
424                    return cid
425
426                except (httpx.TimeoutException, httpx.RequestError) as e:
427                    retry_count += 1
428                    if retry_count > self.max_retries:
429                        raise httpx.TimeoutException(
430                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
431                            request=e.request
432                            if isinstance(e, httpx.RequestError)
433                            else None,
434                        )
435
436                    # Calculate backoff delay
437                    delay = self.initial_delay * (
438                        self.backoff_factor ** (retry_count - 1)
439                    )
440                    # Add some jitter to prevent thundering herd
441                    jitter = delay * 0.1 * (random.random() - 0.5)
442                    await asyncio.sleep(delay + jitter)
443
444                except httpx.HTTPStatusError:
445                    # Re-raise non-timeout HTTP errors immediately
446                    raise
447        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
449    async def load(
450        self,
451        id: IPLDKind,
452        offset: Optional[int] = None,
453        length: Optional[int] = None,
454        suffix: Optional[int] = None,
455    ) -> bytes:
456        """Load data from a CID using the IPFS gateway with optional Range requests."""
457        cid = cast(CID, id)
458        url: str = f"{self.gateway_base_url + str(cid)}"
459        headers: Dict[str, str] = {}
460
461        # Construct the Range header if required
462        if offset is not None:
463            start = offset
464            if length is not None:
465                # Standard HTTP Range: bytes=start-end (inclusive)
466                end = start + length - 1
467                headers["Range"] = f"bytes={start}-{end}"
468            else:
469                # Standard HTTP Range: bytes=start- (from start to end)
470                headers["Range"] = f"bytes={start}-"
471        elif suffix is not None:
472            # Standard HTTP Range: bytes=-N (last N bytes)
473            headers["Range"] = f"bytes=-{suffix}"
474
475        trace_started_at = instrumentation.begin_cas_load(cid, bool(headers))
476        response_bytes = 0
477        final_status = "ok"
478        final_retry_count = 0
479        try:
480            async with self._sem:  # Throttle gateway
481                client = self._loop_client()
482                retry_count = 0
483
484                while retry_count <= self.max_retries:
485                    try:
486                        response = await client.get(
487                            url, headers=headers or None, timeout=60.0
488                        )
489                        response.raise_for_status()
490                        content = response.content
491                        response_bytes = len(content)
492                        final_retry_count = retry_count
493                        return content
494
495                    except (httpx.TimeoutException, httpx.RequestError) as e:
496                        retry_count += 1
497                        if retry_count > self.max_retries:
498                            final_status = "timeout"
499                            final_retry_count = retry_count
500                            raise httpx.TimeoutException(
501                                f"Failed to load data after {self.max_retries} retries: {str(e)}",
502                                request=e.request
503                                if isinstance(e, httpx.RequestError)
504                                else None,
505                            )
506
507                        # Calculate backoff delay with jitter
508                        delay = self.initial_delay * (
509                            self.backoff_factor ** (retry_count - 1)
510                        )
511                        jitter = delay * 0.1 * (random.random() - 0.5)
512                        await asyncio.sleep(delay + jitter)
513
514                    except httpx.HTTPStatusError:
515                        # Re-raise non-timeout HTTP errors immediately
516                        final_status = "http_error"
517                        final_retry_count = retry_count
518                        raise
519        finally:
520            instrumentation.end_cas_load(
521                trace_started_at,
522                byte_count=response_bytes,
523                retries=final_retry_count,
524                status=final_status,
525            )
526        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Load data from a CID using the IPFS gateway with optional Range requests.

async def pin_cid( self, cid: multiformats.cid.CID, target_rpc: str = 'http://127.0.0.1:5001') -> None:
531    async def pin_cid(
532        self,
533        cid: CID,
534        target_rpc: str = "http://127.0.0.1:5001",
535    ) -> None:
536        """
537        Pins a CID to the local Kubo node via the RPC API.
538
539        This call is recursive by default, pinning all linked objects.
540
541        Args:
542            cid (CID): The Content ID to pin.
543            target_rpc (str): The RPC URL of the Kubo node.
544        """
545        params = {"arg": str(cid), "recursive": "true"}
546        pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add"
547
548        async with self._sem:  # throttle RPC
549            client = self._loop_client()
550            response = await client.post(pin_add_url_base, params=params)
551            response.raise_for_status()

Pins a CID to the local Kubo node via the RPC API.

This call is recursive by default, pinning all linked objects.

Args: cid (CID): The Content ID to pin. target_rpc (str): The RPC URL of the Kubo node.

async def unpin_cid( self, cid: multiformats.cid.CID, target_rpc: str = 'http://127.0.0.1:5001') -> None:
553    async def unpin_cid(
554        self, cid: CID, target_rpc: str = "http://127.0.0.1:5001"
555    ) -> None:
556        """
557        Unpins a CID from the local Kubo node via the RPC API.
558
559        Args:
560            cid (CID): The Content ID to unpin.
561        """
562        params = {"arg": str(cid), "recursive": "true"}
563        unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm"
564        async with self._sem:  # throttle RPC
565            client = self._loop_client()
566            response = await client.post(unpin_url_base, params=params)
567            response.raise_for_status()

Unpins a CID from the local Kubo node via the RPC API.

Args: cid (CID): The Content ID to unpin.

async def pin_update( self, old_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], new_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str = 'http://127.0.0.1:5001') -> None:
569    async def pin_update(
570        self,
571        old_id: IPLDKind,
572        new_id: IPLDKind,
573        target_rpc: str = "http://127.0.0.1:5001",
574    ) -> None:
575        """
576        Updates the pinned CID in the storage.
577
578        Args:
579            old_id (IPLDKind): The old Content ID to replace.
580            new_id (IPLDKind): The new Content ID to pin.
581        """
582        params = {"arg": [str(old_id), str(new_id)]}
583        pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update"
584        async with self._sem:  # throttle RPC
585            client = self._loop_client()
586            response = await client.post(pin_update_url_base, params=params)
587            response.raise_for_status()

Updates the pinned CID in the storage.

Args: old_id (IPLDKind): The old Content ID to replace. new_id (IPLDKind): The new Content ID to pin.

async def pin_ls( self, target_rpc: str = 'http://127.0.0.1:5001') -> list[typing.Dict[str, typing.Any]]:
589    async def pin_ls(
590        self, target_rpc: str = "http://127.0.0.1:5001"
591    ) -> list[Dict[str, Any]]:
592        """
593        Lists all pinned CIDs on the local Kubo node via the RPC API.
594
595        Args:
596            target_rpc (str): The RPC URL of the Kubo node.
597
598        Returns:
599            List[CID]: A list of pinned CIDs.
600        """
601        pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls"
602        async with self._sem:  # throttle RPC
603            client = self._loop_client()
604            response = await client.post(pin_ls_url_base)
605            response.raise_for_status()
606            pins = response.json().get("Keys", [])
607            return pins

Lists all pinned CIDs on the local Kubo node via the RPC API.

Args: target_rpc (str): The RPC URL of the Kubo node.

Returns: List[CID]: A list of pinned CIDs.

class ZarrHAMTStore(zarr.abc.store.Store):
 16class ZarrHAMTStore(zarr.abc.store.Store):
 17    """
 18    Write and read Zarr v3s with a HAMT.
 19
 20    Read **or** write a Zarr-v3 store whose key/value pairs live inside a
 21    py-hamt mapping.
 22
 23    Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and
 24    the value is the raw byte payload produced by Zarr.  No additional
 25    framing, compression, or encryption is applied by this class. For a zarr encryption example
 26    see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 27    For a fully encrypted zarr store, where metadata is not available, please see
 28    :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it.
 29
 30    #### A note about using the same `ZarrHAMTStore` for writing and then reading again
 31    If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
 32
 33    #### Sample Code
 34    ```python
 35    # --- Write ---
 36    ds: xarray.Dataset = # ...
 37    cas: ContentAddressedStore = # ...
 38    hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
 39    hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
 40    zhs  = ZarrHAMTStore(hamt, read_only=False)
 41    ds.to_zarr(store=zhs, mode="w", zarr_format=3)
 42    await hamt.make_read_only() # flush + freeze
 43    root_node_id = hamt.root_node_id
 44    print(root_node_id)
 45
 46     # --- read ---
 47    hamt_ro = await HAMT.build(
 48        cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
 49    )
 50    zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
 51    ds_ro = xarray.open_zarr(store=zhs_ro)
 52
 53
 54    print(ds_ro)
 55    xarray.testing.assert_identical(ds, ds_ro)
 56    ```
 57    """
 58
 59    _forced_read_only: bool | None = None  # sentinel for wrapper clones
 60
 61    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
 62        """
 63        ### `hamt` and `read_only`
 64        You need to make sure the following two things are true:
 65
 66        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
 67        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
 68
 69        ##### A note about the zarr chunk separator, "/" vs "."
 70        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
 71
 72        #### Metadata Read Cache
 73        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
 74        """
 75        super().__init__(read_only=read_only)
 76
 77        assert hamt.read_only == read_only
 78        assert hamt.values_are_bytes
 79        self.hamt: HAMT = hamt
 80        """
 81        The internal HAMT.
 82        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
 83        """
 84
 85        self.metadata_read_cache: dict[str, bytes] = {}
 86        """@private"""
 87
 88    def _map_byte_request(
 89        self, byte_range: Optional[zarr.abc.store.ByteRequest]
 90    ) -> tuple[Optional[int], Optional[int], Optional[int]]:
 91        """Helper to map Zarr ByteRequest to offset, length, suffix."""
 92        offset: Optional[int] = None
 93        length: Optional[int] = None
 94        suffix: Optional[int] = None
 95
 96        if byte_range:
 97            if isinstance(byte_range, zarr.abc.store.RangeByteRequest):
 98                offset = byte_range.start
 99                length = byte_range.end - byte_range.start
100                if length is not None and length < 0:
101                    raise ValueError("End must be >= start for RangeByteRequest")
102            elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest):
103                offset = byte_range.offset
104            elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest):
105                suffix = byte_range.suffix
106            else:
107                raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}")
108
109        return offset, length, suffix
110
111    @property
112    def read_only(self) -> bool:  # type: ignore[override]
113        if self._forced_read_only is not None:  # instance attr overrides
114            return self._forced_read_only
115        return self.hamt.read_only
116
117    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
118        """
119        Return this store (if the flag already matches) or a *shallow*
120        clone that presents the requested read‑only status.
121
122        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
123        instance; no flushing, network traffic or async work is done.
124        """
125        # Fast path
126        if read_only == self.read_only:
127            return self  # Same mode, return same instance
128
129        # Create new instance with different read_only flag
130        # Creates a *bare* instance without running its __init__
131        clone = type(self).__new__(type(self))
132
133        # Copy attributes that matter
134        clone.hamt = self.hamt  # Share the HAMT
135        clone._forced_read_only = read_only
136        clone.metadata_read_cache = self.metadata_read_cache.copy()
137
138        # Re‑initialise the zarr base class so that Zarr sees the flag
139        zarr.abc.store.Store.__init__(clone, read_only=read_only)
140        return clone
141
142    def __eq__(self, other: object) -> bool:
143        """@private"""
144        if not isinstance(other, ZarrHAMTStore):
145            return False
146        return self.hamt.root_node_id == other.hamt.root_node_id
147
148    async def get(
149        self,
150        key: str,
151        prototype: zarr.core.buffer.BufferPrototype,
152        byte_range: zarr.abc.store.ByteRequest | None = None,
153    ) -> zarr.core.buffer.Buffer | None:
154        """@private"""
155        with instrumentation.span(
156            "py_hamt.hamt_store.get",
157            {
158                "py_hamt.zarr.key": key,
159                "py_hamt.zarr.byte_range": byte_range is not None,
160            },
161        ):
162            started_at = time.perf_counter()
163            hit = False
164            is_metadata = len(key) >= 9 and key[-9:] == "zarr.json"
165            try:
166                val: bytes
167                # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will
168                # if path ends with zarr.json
169
170                if (
171                    is_metadata
172                    and byte_range is None
173                    and key in self.metadata_read_cache
174                ):
175                    val = self.metadata_read_cache[key]
176                else:
177                    offset, length, suffix = self._map_byte_request(byte_range)
178                    val = cast(
179                        bytes,
180                        await self.hamt.get(
181                            key, offset=offset, length=length, suffix=suffix
182                        ),
183                    )  # We know values received will always be bytes since we only store bytes in the HAMT
184                    if is_metadata and byte_range is None:
185                        self.metadata_read_cache[key] = val
186
187                hit = True
188                return prototype.buffer.from_bytes(val)
189            except KeyError:
190                # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases
191                return None
192            except Exception as e:
193                print(f"Error getting key '{key}' with range {byte_range}: {e}")
194                raise
195            finally:
196                instrumentation.record_zarr_get(
197                    store="hamt_store",
198                    key=key,
199                    kind="metadata" if is_metadata else "chunk",
200                    hit=hit,
201                    seconds=time.perf_counter() - started_at,
202                    byte_range=byte_range is not None,
203                )
204
205    async def get_partial_values(
206        self,
207        prototype: zarr.core.buffer.BufferPrototype,
208        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
209    ) -> list[zarr.core.buffer.Buffer | None]:
210        """
211        Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
212        """
213        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
214        results = await asyncio.gather(
215            *tasks, return_exceptions=False
216        )  # Set return_exceptions=True for debugging
217        return results
218
219    async def exists(self, key: str) -> bool:
220        """@private"""
221        try:
222            await self.hamt.get(key)
223            return True
224        except KeyError:
225            return False
226
227    @property
228    def supports_writes(self) -> bool:
229        """@private"""
230        return not self.hamt.read_only
231
232    @property
233    def supports_partial_writes(self) -> bool:
234        """@private"""
235        return False
236
237    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
238        """@private"""
239        if self.read_only:
240            raise Exception("Cannot write to a read-only store.")
241
242        if key in self.metadata_read_cache:
243            self.metadata_read_cache[key] = value.to_bytes()
244        await self.hamt.set(key, value.to_bytes())
245
246    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
247        """@private"""
248        if not (await self.exists(key)):
249            await self.set(key, value)
250
251    async def set_partial_values(
252        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
253    ) -> None:
254        """@private"""
255        raise NotImplementedError
256
257    @property
258    def supports_deletes(self) -> bool:
259        """@private"""
260        return not self.hamt.read_only
261
262    async def delete(self, key: str) -> None:
263        """@private"""
264        if self.read_only:
265            raise Exception("Cannot write to a read-only store.")
266        try:
267            await self.hamt.delete(key)
268            # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo
269            # if key in self.metadata_read_cache:
270            #     del self.metadata_read_cache[key]
271        # It's fine if the key was not in the HAMT
272        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues
273        except KeyError:
274            return
275
276    @property
277    def supports_listing(self) -> bool:
278        """@private"""
279        return True
280
281    async def list(self) -> AsyncIterator[str]:
282        """@private"""
283        async for key in self.hamt.keys():
284            yield key
285
286    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
287        """@private"""
288        async for key in self.hamt.keys():
289            if key.startswith(prefix):
290                yield key
291
292    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
293        """
294        @private
295        List *immediate* children that live directly under **prefix**.
296
297        This is similar to :py:meth:`list_prefix` but collapses everything
298        below the first ``"/"`` after *prefix*.  Each child name is yielded
299        **exactly once** in the order of first appearance while scanning the
300        HAMT keys.
301
302        Parameters
303        ----------
304        prefix : str
305            Logical directory path.  *Must* end with ``"/"`` for the result to
306            make sense (e.g. ``"a/b/"``).
307
308        Yields
309        ------
310        str
311            The name of each direct child (file or sub-directory) of *prefix*.
312
313        Examples
314        --------
315        With keys ::
316
317            a/b/c/d
318            a/b/c/e
319            a/b/f
320            a/b/g/h/i
321
322        ``await list_dir("a/b/")`` produces ::
323
324            c
325            f
326            g
327
328        Notes
329        -----
330        • Internally uses a :class:`set` to deduplicate names; memory grows
331            with the number of *unique* children, not the total number of keys.
332        • Order is **not** sorted; it reflects the first encounter while
333            iterating over :py:meth:`HAMT.keys`.
334        """
335        seen_names: set[str] = set()
336        async for key in self.hamt.keys():
337            if key.startswith(prefix):
338                suffix: str = key[len(prefix) :]
339                first_slash: int = suffix.find("/")
340                if first_slash == -1:
341                    if suffix not in seen_names:
342                        seen_names.add(suffix)
343                        yield suffix
344                else:
345                    name: str = suffix[0:first_slash]
346                    if name not in seen_names:
347                        seen_names.add(name)
348                        yield name

Write and read Zarr v3s with a HAMT.

Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.

Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and the value is the raw byte payload produced by Zarr. No additional framing, compression, or encryption is applied by this class. For a zarr encryption example see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb For a fully encrypted zarr store, where metadata is not available, please see SimpleEncryptedZarrHAMTStore but we do not recommend using it.

A note about using the same ZarrHAMTStore for writing and then reading again

If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.

Sample Code

# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
zhs  = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)

 # --- read ---
hamt_ro = await HAMT.build(
    cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)


print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
ZarrHAMTStore(hamt: HAMT, read_only: bool = False)
61    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
62        """
63        ### `hamt` and `read_only`
64        You need to make sure the following two things are true:
65
66        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
67        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
68
69        ##### A note about the zarr chunk separator, "/" vs "."
70        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
71
72        #### Metadata Read Cache
73        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
74        """
75        super().__init__(read_only=read_only)
76
77        assert hamt.read_only == read_only
78        assert hamt.values_are_bytes
79        self.hamt: HAMT = hamt
80        """
81        The internal HAMT.
82        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
83        """
84
85        self.metadata_read_cache: dict[str, bytes] = {}
86        """@private"""

hamt and read_only

You need to make sure the following two things are true:

  1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but __init__ cannot be async.
  2. The HAMT has hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."

While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.

Metadata Read Cache

ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.

hamt: HAMT

The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.

read_only: bool
111    @property
112    def read_only(self) -> bool:  # type: ignore[override]
113        if self._forced_read_only is not None:  # instance attr overrides
114            return self._forced_read_only
115        return self.hamt.read_only

Is the store read-only?

def with_read_only(self, read_only: bool = False) -> ZarrHAMTStore:
117    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
118        """
119        Return this store (if the flag already matches) or a *shallow*
120        clone that presents the requested read‑only status.
121
122        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
123        instance; no flushing, network traffic or async work is done.
124        """
125        # Fast path
126        if read_only == self.read_only:
127            return self  # Same mode, return same instance
128
129        # Create new instance with different read_only flag
130        # Creates a *bare* instance without running its __init__
131        clone = type(self).__new__(type(self))
132
133        # Copy attributes that matter
134        clone.hamt = self.hamt  # Share the HAMT
135        clone._forced_read_only = read_only
136        clone.metadata_read_cache = self.metadata_read_cache.copy()
137
138        # Re‑initialise the zarr base class so that Zarr sees the flag
139        zarr.abc.store.Store.__init__(clone, read_only=read_only)
140        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.

async def get_partial_values( self, prototype: zarr.core.buffer.core.BufferPrototype, key_ranges: Iterable[tuple[str, zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None]]) -> list[zarr.core.buffer.core.Buffer | None]:
205    async def get_partial_values(
206        self,
207        prototype: zarr.core.buffer.BufferPrototype,
208        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
209    ) -> list[zarr.core.buffer.Buffer | None]:
210        """
211        Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
212        """
213        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
214        results = await asyncio.gather(
215            *tasks, return_exceptions=False
216        )  # Set return_exceptions=True for debugging
217        return results

Retrieves multiple keys or byte ranges concurrently using asyncio.gather.

class SimpleEncryptedZarrHAMTStore(py_hamt.ZarrHAMTStore):
 13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore):
 14    """
 15    Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy.
 16
 17    This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
 18    stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
 19    and data chunks. This provides strong privacy but means the Zarr store is
 20    completely opaque and unusable without the correct encryption key and header.
 21
 22    Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23
 24    #### Encryption Details
 25    - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
 26    - Requires a 32-byte encryption key and a header.
 27    - Each encrypted value includes a 24-byte nonce and a 16-byte tag.
 28
 29    #### Important Considerations
 30    - Since metadata is encrypted, standard Zarr tools cannot inspect the
 31      dataset without prior decryption using this store class.
 32    - There is no support for partial encryption or excluding variables.
 33    - There is no metadata caching.
 34
 35    #### Sample Code
 36    ```python
 37    import xarray
 38    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
 39    from Crypto.Random import get_random_bytes
 40    import numpy as np
 41
 42    # Setup
 43    ds = xarray.Dataset(
 44        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
 45        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
 46    )
 47    cas = KuboCAS() # Example ContentAddressedStore
 48    encryption_key = get_random_bytes(32)
 49    header = b"fully-encrypted-zarr"
 50
 51    # --- Write ---
 52    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
 53    ezhs_write = SimpleEncryptedZarrHAMTStore(
 54        hamt_write, False, encryption_key, header
 55    )
 56    print("Writing fully encrypted Zarr...")
 57    ds.to_zarr(store=ezhs_write, mode="w")
 58    await hamt_write.make_read_only()
 59    root_node_id = hamt_write.root_node_id
 60    print(f"Wrote Zarr with root: {root_node_id}")
 61
 62    # --- Read ---
 63    hamt_read = await HAMT.build(
 64            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
 65        )
 66    ezhs_read = SimpleEncryptedZarrHAMTStore(
 67        hamt_read, True, encryption_key, header
 68    )
 69    print("\nReading fully encrypted Zarr...")
 70    ds_read = xarray.open_zarr(store=ezhs_read)
 71    print("Read back dataset:")
 72    print(ds_read)
 73    xarray.testing.assert_identical(ds, ds_read)
 74    print("Read successful and data verified.")
 75
 76    # --- Read with wrong key (demonstrates failure) ---
 77    wrong_key = get_random_bytes(32)
 78    hamt_bad = await HAMT.build(
 79        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
 80    )
 81    ezhs_bad = SimpleEncryptedZarrHAMTStore(
 82        hamt_bad, True, wrong_key, header
 83    )
 84    print("\nAttempting to read with wrong key...")
 85    try:
 86        ds_bad = xarray.open_zarr(store=ezhs_bad)
 87        print(ds_bad)
 88    except Exception as e:
 89        print(f"Failed to read as expected: {type(e).__name__} - {e}")
 90    ```
 91    """
 92
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}
113
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone
126
127    def _encrypt(self, val: bytes) -> bytes:
128        """Encrypts data using ChaCha20-Poly1305."""
129        nonce = get_random_bytes(24)  # XChaCha20 uses a 24-byte nonce
130        cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
131        cipher.update(self.header)
132        ciphertext, tag = cipher.encrypt_and_digest(val)
133        return nonce + tag + ciphertext
134
135    def _decrypt(self, val: bytes) -> bytes:
136        """Decrypts data using ChaCha20-Poly1305."""
137        try:
138            # Extract nonce (24), tag (16), and ciphertext
139            nonce, tag, ciphertext = val[:24], val[24:40], val[40:]
140            cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
141            cipher.update(self.header)
142            plaintext = cipher.decrypt_and_verify(ciphertext, tag)
143            return plaintext
144        except Exception as e:
145            # Catching a broad exception as various issues (key, tag, length) can occur.
146            raise ValueError(
147                "Decryption failed. Check key, header, or data integrity."
148            ) from e
149
150    def __eq__(self, other: object) -> bool:
151        """@private"""
152        if not isinstance(other, SimpleEncryptedZarrHAMTStore):
153            return False
154        return (
155            self.hamt.root_node_id == other.hamt.root_node_id
156            and self.encryption_key == other.encryption_key
157            and self.header == other.header
158        )
159
160    async def get(
161        self,
162        key: str,
163        prototype: zarr.core.buffer.BufferPrototype,
164        byte_range: zarr.abc.store.ByteRequest | None = None,
165    ) -> zarr.core.buffer.Buffer | None:
166        """@private"""
167        try:
168            decrypted_val: bytes
169            is_metadata: bool = (
170                len(key) >= 9 and key[-9:] == "zarr.json"
171            )  # if path ends with zarr.json
172
173            if is_metadata and key in self.metadata_read_cache:
174                decrypted_val = self.metadata_read_cache[key]
175            else:
176                raw_val = cast(
177                    bytes, await self.hamt.get(key)
178                )  # We know values received will always be bytes since we only store bytes in the HAMT
179                decrypted_val = self._decrypt(raw_val)
180                if is_metadata:
181                    self.metadata_read_cache[key] = decrypted_val
182            return prototype.buffer.from_bytes(decrypted_val)
183        except KeyError:
184            return None
185
186    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
187        """@private"""
188        if self.read_only:
189            raise Exception("Cannot write to a read-only store.")
190
191        raw_bytes = value.to_bytes()
192        if key in self.metadata_read_cache:
193            self.metadata_read_cache[key] = raw_bytes
194        # Encrypt it
195        encrypted_bytes = self._encrypt(raw_bytes)
196        await self.hamt.set(key, encrypted_bytes)

Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.

This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.

Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb

#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.

#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
  dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.

#### Sample Code


    import xarray
    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
    from Crypto.Random import get_random_bytes
    import numpy as np

    # Setup
    ds = xarray.Dataset(
        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
    )
    cas = KuboCAS() # Example ContentAddressedStore
    encryption_key = get_random_bytes(32)
    header = b"fully-encrypted-zarr"

    # --- Write ---
    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
    ezhs_write = SimpleEncryptedZarrHAMTStore(
        hamt_write, False, encryption_key, header
    )
    print("Writing fully encrypted Zarr...")
    ds.to_zarr(store=ezhs_write, mode="w")
    await hamt_write.make_read_only()
    root_node_id = hamt_write.root_node_id
    print(f"Wrote Zarr with root: {root_node_id}")

    # --- Read ---
    hamt_read = await HAMT.build(
            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
        )
    ezhs_read = SimpleEncryptedZarrHAMTStore(
        hamt_read, True, encryption_key, header
    )
    print("
Reading fully encrypted Zarr...")
    ds_read = xarray.open_zarr(store=ezhs_read)
    print("Read back dataset:")
    print(ds_read)
    xarray.testing.assert_identical(ds, ds_read)
    print("Read successful and data verified.")

    # --- Read with wrong key (demonstrates failure) ---
    wrong_key = get_random_bytes(32)
    hamt_bad = await HAMT.build(
        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
    )
    ezhs_bad = SimpleEncryptedZarrHAMTStore(
        hamt_bad, True, wrong_key, header
    )
    print("
Attempting to read with wrong key...")
    try:
        ds_bad = xarray.open_zarr(store=ezhs_bad)
        print(ds_bad)
    except Exception as e:
        print(f"Failed to read as expected: {type(e).__name__} - {e}")
SimpleEncryptedZarrHAMTStore( hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes)
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}

Initializes the SimpleEncryptedZarrHAMTStore.

Args: hamt: The HAMT instance for storage. Must have values_are_bytes=True. Its read_only status must match the read_only argument. read_only: If True, the store is in read-only mode. encryption_key: A 32-byte key for ChaCha20-Poly1305. header: A header (bytes) used as associated data in encryption.

encryption_key
header
def with_read_only( self, read_only: bool = False) -> SimpleEncryptedZarrHAMTStore:
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.

class ShardedZarrStore(zarr.abc.store.Store):
145class ShardedZarrStore(zarr.abc.store.Store):
146    """
147    Implements the Zarr Store API using a sharded layout for chunk CIDs.
148
149    This store divides the flat index of chunk CIDs into multiple "shards".
150    Each shard is a DAG-CBOR array where each element is either a CID link
151    to a chunk or a null value if the chunk is empty. This structure allows
152    for efficient traversal by IPLD-aware systems.
153
154    The store's root object contains:
155    1.  A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
156    2.  A list of CIDs, where each CID points to a shard object.
157    3.  Sharding configuration details (e.g., chunks_per_shard).
158    """
159
160    def __init__(
161        self,
162        cas: ContentAddressedStore,
163        read_only: bool,
164        root_cid: Optional[str] = None,
165        *,
166        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
167    ):
168        """Use the async `open()` classmethod to instantiate this class."""
169        super().__init__(read_only=read_only)
170        self.cas = cas
171        self._root_cid = root_cid
172        self._root_obj: dict
173
174        self._resize_lock = asyncio.Lock()
175        # An event to signal when a resize is in-progress.
176        # It starts in the "set" state, allowing all operations to proceed.
177        self._resize_complete = asyncio.Event()
178        self._resize_complete.set()
179        self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
180
181        self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes)
182        self._pending_shard_loads: Dict[int, asyncio.Event] = {}
183        self._metadata_read_cache: Dict[str, bytes] = {}
184
185        self._array_shape: Tuple[int, ...]
186        self._chunk_shape: Tuple[int, ...]
187        self._chunks_per_dim: Tuple[int, ...]
188        self._chunks_per_shard: int
189        self._num_shards: int = 0
190        self._total_chunks: int = 0
191
192        self._dirty_root = False
193
194    def __update_geometry(self):
195        """Calculates derived geometric properties from the base shapes."""
196
197        if not all(cs > 0 for cs in self._chunk_shape):
198            raise ValueError("All chunk_shape dimensions must be positive.")
199        if not all(s >= 0 for s in self._array_shape):
200            raise ValueError("All array_shape dimensions must be non-negative.")
201
202        self._chunks_per_dim = tuple(
203            math.ceil(a / c) if c > 0 else 0
204            for a, c in zip(self._array_shape, self._chunk_shape)
205        )
206        self._total_chunks = math.prod(self._chunks_per_dim)
207
208        if not self._total_chunks == 0:
209            self._num_shards = (
210                self._total_chunks + self._chunks_per_shard - 1
211            ) // self._chunks_per_shard
212
213    @classmethod
214    async def open(
215        cls,
216        cas: ContentAddressedStore,
217        read_only: bool,
218        root_cid: Optional[str] = None,
219        *,
220        array_shape: Optional[Tuple[int, ...]] = None,
221        chunk_shape: Optional[Tuple[int, ...]] = None,
222        chunks_per_shard: Optional[int] = None,
223        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
224    ) -> "ShardedZarrStore":
225        """
226        Asynchronously opens an existing ShardedZarrStore or initializes a new one.
227        """
228        store = cls(
229            cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes
230        )
231        if root_cid:
232            await store._load_root_from_cid()
233        elif not read_only:
234            if array_shape is None or chunk_shape is None:
235                raise ValueError(
236                    "array_shape and chunk_shape must be provided for a new store."
237                )
238
239            if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0:
240                raise ValueError("chunks_per_shard must be a positive integer.")
241
242            store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard)
243        else:
244            raise ValueError("root_cid must be provided for a read-only store.")
245        return store
246
247    def _initialize_new_root(
248        self,
249        array_shape: Tuple[int, ...],
250        chunk_shape: Tuple[int, ...],
251        chunks_per_shard: int,
252    ):
253        self._array_shape = array_shape
254        self._chunk_shape = chunk_shape
255        self._chunks_per_shard = chunks_per_shard
256
257        self.__update_geometry()
258
259        self._root_obj = {
260            "manifest_version": "sharded_zarr_v1",
261            "metadata": {},
262            "chunks": {
263                "array_shape": list(self._array_shape),
264                "chunk_shape": list(self._chunk_shape),
265                "sharding_config": {
266                    "chunks_per_shard": self._chunks_per_shard,
267                },
268                "shard_cids": [None] * self._num_shards,
269            },
270        }
271        self._dirty_root = True
272
273    async def _load_root_from_cid(self):
274        root_bytes = await self.cas.load(self._root_cid)
275        try:
276            self._root_obj = dag_cbor.decode(root_bytes)
277            if not isinstance(self._root_obj, dict) or "chunks" not in self._root_obj:
278                raise ValueError(
279                    "Root object is not a valid dictionary with 'chunks' key."
280                )
281            if not isinstance(self._root_obj["chunks"]["shard_cids"], list):
282                raise ValueError("shard_cids is not a list.")
283        except Exception as e:
284            raise ValueError(f"Failed to decode root object: {e}")
285
286        if self._root_obj.get("manifest_version") != "sharded_zarr_v1":
287            raise ValueError(
288                f"Incompatible manifest version: {self._root_obj.get('manifest_version')}. Expected 'sharded_zarr_v1'."
289            )
290
291        chunk_info = self._root_obj["chunks"]
292        self._array_shape = tuple(chunk_info["array_shape"])
293        self._chunk_shape = tuple(chunk_info["chunk_shape"])
294        self._chunks_per_shard = chunk_info["sharding_config"]["chunks_per_shard"]
295
296        self.__update_geometry()
297
298        if len(chunk_info["shard_cids"]) != self._num_shards:
299            raise ValueError(
300                f"Inconsistent number of shards. Expected {self._num_shards}, found {len(chunk_info['shard_cids'])}."
301            )
302
303    async def _fetch_and_cache_full_shard(
304        self,
305        shard_idx: int,
306        shard_cid: str,
307        max_retries: int = 3,
308        retry_delay: float = 1.0,
309    ) -> None:
310        """
311        Fetch a shard from CAS and cache it, with retry logic for transient errors.
312
313        Args:
314            shard_idx: The index of the shard to fetch.
315            shard_cid: The CID of the shard.
316            max_retries: Maximum number of retry attempts for transient errors.
317            retry_delay: Delay between retry attempts in seconds.
318        """
319        for attempt in range(max_retries):
320            try:
321                shard_data_bytes = await self.cas.load(shard_cid)
322                decoded_shard = dag_cbor.decode(shard_data_bytes)
323                if not isinstance(decoded_shard, list):
324                    raise TypeError(f"Shard {shard_idx} did not decode to a list.")
325                await self._shard_data_cache.put(shard_idx, decoded_shard)
326                # Always set the Event to unblock waiting coroutines
327                if shard_idx in self._pending_shard_loads:
328                    self._pending_shard_loads[shard_idx].set()
329                    del self._pending_shard_loads[shard_idx]
330                return  # Success
331            except (ConnectionError, TimeoutError) as e:
332                # Handle transient errors (e.g., network issues)
333                if attempt < max_retries - 1:
334                    await asyncio.sleep(
335                        retry_delay * (2**attempt)
336                    )  # Exponential backoff
337                    continue
338                else:
339                    raise RuntimeError(
340                        f"Failed to fetch shard {shard_idx} after {max_retries} attempts: {e}"
341                    )
342
343    def _parse_chunk_key(self, key: str) -> Optional[Tuple[int, ...]]:
344        # 1. Exclude .json files immediately (metadata)
345        if key.endswith(".json"):
346            return None
347        excluded_array_prefixes = {
348            "time",
349            "lat",
350            "lon",
351            "latitude",
352            "longitude",
353            "forecast_reference_time",
354            "step",
355        }
356
357        chunk_marker = "/c/"
358        marker_idx = key.rfind(chunk_marker)  # Use rfind for robustness
359        if marker_idx == -1:
360            # Key does not contain "/c/", so it's not a chunk data key
361            # in the expected format (e.g., could be .zattrs, .zgroup at various levels).
362            return None
363
364        # Extract the part of the key before "/c/", which might represent the array/group path
365        # e.g., "temp" from "temp/c/0/0/0"
366        # e.g., "group1/lat" from "group1/lat/c/0"
367        # e.g., "" if key is "c/0/0/0" (root array)
368        path_before_c = key[:marker_idx]
369
370        # Determine the actual array name (the last component of the path before "/c/")
371        actual_array_name = ""
372        if path_before_c:
373            actual_array_name = path_before_c.split("/")[-1]
374
375        # If the determined array name is in our exclusion list, return None.
376        if actual_array_name in excluded_array_prefixes:
377            return None
378
379        # The part after "/c/" contains the chunk coordinates
380        coord_part = key[marker_idx + len(chunk_marker) :]
381        parts = coord_part.split("/")
382
383        coords = tuple(map(int, parts))
384        # Validate coordinates against the chunk grid of the store's configured array
385        for i, c_coord in enumerate(coords):
386            if not (0 <= c_coord < self._chunks_per_dim[i]):
387                raise IndexError(
388                    f"Chunk coordinate {c_coord} at dimension {i} is out of bounds for dimension size {self._chunks_per_dim[i]}."
389                )
390        return coords
391
392    def _get_linear_chunk_index(self, chunk_coords: Tuple[int, ...]) -> int:
393        linear_index = 0
394        multiplier = 1
395        # Convert N-D chunk coordinates to a flat 1-D index (row-major order)
396        for i in reversed(range(len(self._chunks_per_dim))):
397            linear_index += chunk_coords[i] * multiplier
398            multiplier *= self._chunks_per_dim[i]
399        return linear_index
400
401    def _get_shard_info(self, linear_chunk_index: int) -> Tuple[int, int]:
402        shard_idx = linear_chunk_index // self._chunks_per_shard
403        index_in_shard = linear_chunk_index % self._chunks_per_shard
404        return shard_idx, index_in_shard
405
406    async def _load_or_initialize_shard_cache(
407        self, shard_idx: int
408    ) -> List[Optional[CID]]:
409        """
410        Load a shard into the cache or initialize an empty shard if it doesn't exist.
411
412        Args:
413            shard_idx: The index of the shard to load or initialize.
414
415        Returns:
416            List[Optional[CID]]: The shard data (list of CIDs or None).
417
418        Raises:
419            ValueError: If the shard index is out of bounds.
420            RuntimeError: If the shard cannot be loaded or initialized.
421        """
422        started_at = time.perf_counter()
423        cached_shard = await self._shard_data_cache.get(shard_idx)
424        if cached_shard is not None:
425            instrumentation.record_shard_load(
426                shard_idx=shard_idx,
427                cache_hit=True,
428                seconds=time.perf_counter() - started_at,
429                entries=len(cached_shard),
430            )
431            return cached_shard
432
433        if shard_idx in self._pending_shard_loads:
434            try:
435                # Wait for the pending load with a timeout (e.g., 60 seconds)
436                await asyncio.wait_for(
437                    self._pending_shard_loads[shard_idx].wait(), timeout=60.0
438                )
439                cached_shard = await self._shard_data_cache.get(shard_idx)
440                if cached_shard is not None:
441                    return cached_shard
442                else:
443                    raise RuntimeError(
444                        f"Shard {shard_idx} not found in cache after pending load completed."
445                    )
446            except asyncio.TimeoutError:
447                # Clean up the pending load to allow retry
448                if shard_idx in self._pending_shard_loads:
449                    self._pending_shard_loads[shard_idx].set()
450                    del self._pending_shard_loads[shard_idx]
451                raise RuntimeError(f"Timeout waiting for shard {shard_idx} to load.")
452
453        if not (0 <= shard_idx < self._num_shards):
454            raise ValueError(f"Shard index {shard_idx} out of bounds.")
455
456        shard_cid_obj = self._root_obj["chunks"]["shard_cids"][shard_idx]
457        if shard_cid_obj:
458            self._pending_shard_loads[shard_idx] = asyncio.Event()
459            shard_cid_str = str(shard_cid_obj)
460            await self._fetch_and_cache_full_shard(shard_idx, shard_cid_str)
461        else:
462            empty_shard = [None] * self._chunks_per_shard
463            await self._shard_data_cache.put(shard_idx, empty_shard)
464
465        result = await self._shard_data_cache.get(shard_idx)
466        if result is None:
467            raise RuntimeError(f"Failed to load or initialize shard {shard_idx}")
468        instrumentation.record_shard_load(
469            shard_idx=shard_idx,
470            cache_hit=False,
471            seconds=time.perf_counter() - started_at,
472            entries=len(result),
473        )
474        return result  # type: ignore[return-value]
475
476    async def set_partial_values(
477        self, key_start_values: Iterable[Tuple[str, int, BytesLike]]
478    ) -> None:
479        raise NotImplementedError(
480            "Partial writes are not supported by ShardedZarrStore."
481        )
482
483    async def get_partial_values(
484        self,
485        prototype: zarr.core.buffer.BufferPrototype,
486        key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]],
487    ) -> List[Optional[zarr.core.buffer.Buffer]]:
488        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
489        results = await asyncio.gather(*tasks)
490        return results
491
492    def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore":
493        """
494        Return this store (if the flag already matches) or a *shallow*
495        clone that presents the requested read‑only status.
496
497        The clone **shares** the same CAS instance and internal state;
498        no flushing, network traffic or async work is done.
499        """
500        # Fast path
501        if read_only == self.read_only:
502            return self  # Same mode, return same instance
503
504        # Create new instance with different read_only flag
505        # Creates a *bare* instance without running its __init__
506        clone = type(self).__new__(type(self))
507
508        # Copy all attributes from the current instance
509        clone.cas = self.cas
510        clone._root_cid = self._root_cid
511        clone._root_obj = self._root_obj
512
513        clone._resize_lock = self._resize_lock
514        clone._resize_complete = self._resize_complete
515        clone._shard_locks = self._shard_locks
516
517        clone._shard_data_cache = self._shard_data_cache
518        clone._pending_shard_loads = self._pending_shard_loads
519        clone._metadata_read_cache = self._metadata_read_cache
520
521        clone._array_shape = self._array_shape
522        clone._chunk_shape = self._chunk_shape
523        clone._chunks_per_dim = self._chunks_per_dim
524        clone._chunks_per_shard = self._chunks_per_shard
525        clone._num_shards = self._num_shards
526        clone._total_chunks = self._total_chunks
527
528        clone._dirty_root = self._dirty_root
529
530        # Re‑initialise the zarr base class so that Zarr sees the flag
531        zarr.abc.store.Store.__init__(clone, read_only=read_only)
532        return clone
533
534    def __eq__(self, other: object) -> bool:
535        if not isinstance(other, ShardedZarrStore):
536            return False
537        # For equality, root CID is primary. Config like chunks_per_shard is part of that root's identity.
538        return self._root_cid == other._root_cid
539
540    # If nothing to flush, return the root CID.
541    async def flush(self) -> str:
542        async with self._shard_data_cache._cache_lock:
543            dirty_shards = list(self._shard_data_cache._dirty_shards)
544        if dirty_shards:
545            for shard_idx in sorted(dirty_shards):
546                # Get the list of CIDs/Nones from the cache
547                shard_data_list = await self._shard_data_cache.get(shard_idx)
548                if shard_data_list is None:
549                    raise RuntimeError(f"Dirty shard {shard_idx} not found in cache")
550
551                # Encode this list into a DAG-CBOR byte representation
552                shard_data_bytes = dag_cbor.encode(shard_data_list)
553
554                # Save the DAG-CBOR block and get its CID
555                new_shard_cid_obj = await self.cas.save(
556                    shard_data_bytes,
557                    codec="dag-cbor",  # Use 'dag-cbor' codec
558                )
559
560                if (
561                    self._root_obj["chunks"]["shard_cids"][shard_idx]
562                    != new_shard_cid_obj
563                ):
564                    # Store the CID object directly
565                    self._root_obj["chunks"]["shard_cids"][shard_idx] = (
566                        new_shard_cid_obj
567                    )
568                    self._dirty_root = True
569                    # Mark shard as clean after flushing
570                    await self._shard_data_cache.mark_clean(shard_idx)
571
572        if self._dirty_root:
573            # Ensure all metadata CIDs are CID objects for correct encoding
574            self._root_obj["metadata"] = {
575                k: (CID.decode(v) if isinstance(v, str) else v)
576                for k, v in self._root_obj["metadata"].items()
577            }
578            root_obj_bytes = dag_cbor.encode(self._root_obj)
579            new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor")
580            self._root_cid = str(new_root_cid)
581            self._dirty_root = False
582
583        # Ignore because root_cid will always exist after initialization or flush.
584        return self._root_cid  # type: ignore[return-value]
585
586    async def get(
587        self,
588        key: str,
589        prototype: zarr.core.buffer.BufferPrototype,
590        byte_range: Optional[zarr.abc.store.ByteRequest] = None,
591    ) -> Optional[zarr.core.buffer.Buffer]:
592        with instrumentation.span(
593            "py_hamt.sharded_store.get",
594            {
595                "py_hamt.zarr.key": key,
596                "py_hamt.zarr.byte_range": byte_range is not None,
597            },
598        ):
599            started_at = time.perf_counter()
600            hit = False
601            kind = "metadata"
602            shard_idx_for_trace: int | None = None
603            chunk_coords = self._parse_chunk_key(key)
604            try:
605                # Metadata request
606                if chunk_coords is None:
607                    metadata_cid_obj = self._root_obj["metadata"].get(key)
608                    if metadata_cid_obj is None:
609                        return None
610                    if byte_range is not None:
611                        raise ValueError(
612                            "Byte range requests are not supported for metadata keys."
613                        )
614                    data = self._metadata_read_cache.get(key)
615                    if data is None:
616                        data = await self.cas.load(str(metadata_cid_obj))
617                        self._metadata_read_cache[key] = data
618                    hit = True
619                    return prototype.buffer.from_bytes(data)
620                # Chunk data request
621                kind = "chunk"
622                linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
623                shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
624                shard_idx_for_trace = shard_idx
625
626                # This will load the full shard into cache if it's not already there.
627                shard_lock = self._shard_locks[shard_idx]
628                async with shard_lock:
629                    target_shard_list = await self._load_or_initialize_shard_cache(
630                        shard_idx
631                    )
632
633                # Get the CID object (or None) from the cached list.
634                chunk_cid_obj = target_shard_list[index_in_shard]
635
636                if chunk_cid_obj is None:
637                    return None  # Chunk is empty/doesn't exist.
638
639                chunk_cid_str = str(chunk_cid_obj)
640
641                req_offset = None
642                req_length = None
643                req_suffix = None
644
645                if byte_range:
646                    if isinstance(byte_range, RangeByteRequest):
647                        req_offset = byte_range.start
648                        if byte_range.end is not None:
649                            if byte_range.start > byte_range.end:
650                                raise ValueError(
651                                    f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})"
652                                )
653                            req_length = byte_range.end - byte_range.start
654                    elif isinstance(byte_range, OffsetByteRequest):
655                        req_offset = byte_range.offset
656                    elif isinstance(byte_range, SuffixByteRequest):
657                        req_suffix = byte_range.suffix
658                data = await self.cas.load(
659                    chunk_cid_str,
660                    offset=req_offset,
661                    length=req_length,
662                    suffix=req_suffix,
663                )
664                hit = True
665                return prototype.buffer.from_bytes(data)
666            finally:
667                instrumentation.record_zarr_get(
668                    store="sharded_store",
669                    key=key,
670                    kind=kind,
671                    hit=hit,
672                    seconds=time.perf_counter() - started_at,
673                    byte_range=byte_range is not None,
674                    shard_idx=shard_idx_for_trace,
675                )
676
677    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
678        if self.read_only:
679            raise PermissionError("Cannot write to a read-only store.")
680        await self._resize_complete.wait()
681
682        if key.endswith("zarr.json") and not key == "zarr.json":
683            metadata_json = json.loads(value.to_bytes().decode("utf-8"))
684            new_array_shape = metadata_json.get("shape")
685            # Some metadata entries (e.g., group metadata) do not have a shape field.
686            if new_array_shape:
687                # Only resize when the metadata shape represents the primary array.
688                if (
689                    len(new_array_shape) == len(self._array_shape)
690                    and tuple(new_array_shape) != self._array_shape
691                ):
692                    async with self._resize_lock:
693                        # Double-check after acquiring the lock, in case another task
694                        # just finished this exact resize while we were waiting.
695                        if (
696                            len(new_array_shape) == len(self._array_shape)
697                            and tuple(new_array_shape) != self._array_shape
698                        ):
699                            # Block all other tasks until resize is complete.
700                            self._resize_complete.clear()
701                            try:
702                                await self.resize_store(
703                                    new_shape=tuple(new_array_shape)
704                                )
705                            finally:
706                                # All waiting tasks will now un-pause and proceed safely.
707                                self._resize_complete.set()
708
709        raw_data_bytes = value.to_bytes()
710        # Save the data to CAS first to get its CID.
711        # Metadata is often saved as 'raw', chunks as well unless compressed.
712        try:
713            data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw")
714            await self.set_pointer(key, str(data_cid_obj))
715            if self._parse_chunk_key(key) is None:
716                self._metadata_read_cache[key] = raw_data_bytes
717        except Exception as e:
718            raise RuntimeError(f"Failed to save data for key {key}: {e}")
719        return None  # type: ignore[return-value]
720
721    async def set_pointer(self, key: str, pointer: str) -> None:
722        chunk_coords = self._parse_chunk_key(key)
723
724        pointer_cid_obj = CID.decode(pointer)  # Convert string to CID object
725
726        if chunk_coords is None:  # Metadata key
727            self._root_obj["metadata"][key] = pointer_cid_obj
728            self._dirty_root = True
729            return None
730
731        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
732        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
733
734        shard_lock = self._shard_locks[shard_idx]
735        async with shard_lock:
736            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
737
738            if target_shard_list[index_in_shard] != pointer_cid_obj:
739                target_shard_list[index_in_shard] = pointer_cid_obj
740                await self._shard_data_cache.mark_dirty(shard_idx)
741        return None
742
743    async def exists(self, key: str) -> bool:
744        try:
745            chunk_coords = self._parse_chunk_key(key)
746            if chunk_coords is None:  # Metadata
747                return key in self._root_obj.get("metadata", {})
748            linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
749            shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
750            # Load shard if not cached and check the index
751            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
752            return target_shard_list[index_in_shard] is not None
753        except (ValueError, IndexError, KeyError):
754            return False
755
756    @property
757    def supports_writes(self) -> bool:
758        return not self.read_only
759
760    @property
761    def supports_partial_writes(self) -> bool:
762        return False  # Each chunk CID is written atomically into a shard slot
763
764    @property
765    def supports_deletes(self) -> bool:
766        return not self.read_only
767
768    async def delete(self, key: str) -> None:
769        if self.read_only:
770            raise PermissionError("Cannot delete from a read-only store.")
771
772        chunk_coords = self._parse_chunk_key(key)
773        if chunk_coords is None:  # Metadata
774            # Coordinate/metadata deletions should be idempotent for caller convenience.
775            if self._root_obj["metadata"].pop(key, None) is not None:
776                self._metadata_read_cache.pop(key, None)
777                self._dirty_root = True
778            return None
779
780        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
781        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
782
783        shard_lock = self._shard_locks[shard_idx]
784        async with shard_lock:
785            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
786            if target_shard_list[index_in_shard] is not None:
787                target_shard_list[index_in_shard] = None
788                await self._shard_data_cache.mark_dirty(shard_idx)
789
790    @property
791    def supports_listing(self) -> bool:
792        return True
793
794    async def list(self) -> AsyncIterator[str]:
795        for key in list(self._root_obj.get("metadata", {})):
796            yield key
797
798    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
799        async for key in self.list():
800            if key.startswith(prefix):
801                yield key
802
803    async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
804        if self.read_only:
805            raise PermissionError("Cannot graft onto a read-only store.")
806
807        store_to_graft = await ShardedZarrStore.open(
808            cas=self.cas, read_only=True, root_cid=store_to_graft_cid
809        )
810        source_chunk_grid = store_to_graft._chunks_per_dim
811        for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]):
812            linear_local_index = store_to_graft._get_linear_chunk_index(local_coords)
813            local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info(
814                linear_local_index
815            )
816            # Load the source shard into its cache
817            source_shard_list = await store_to_graft._load_or_initialize_shard_cache(
818                local_shard_idx
819            )
820
821            pointer_cid_obj = source_shard_list[index_in_local_shard]
822            if pointer_cid_obj is None:
823                continue
824
825            # Calculate global coordinates and write to the main store's index
826            global_coords = tuple(
827                c_local + c_offset
828                for c_local, c_offset in zip(local_coords, chunk_offset)
829            )
830            linear_global_index = self._get_linear_chunk_index(global_coords)
831            global_shard_idx, index_in_global_shard = self._get_shard_info(
832                linear_global_index
833            )
834
835            shard_lock = self._shard_locks[global_shard_idx]
836            async with shard_lock:
837                target_shard_list = await self._load_or_initialize_shard_cache(
838                    global_shard_idx
839                )
840                if target_shard_list[index_in_global_shard] != pointer_cid_obj:
841                    target_shard_list[index_in_global_shard] = pointer_cid_obj
842                    await self._shard_data_cache.mark_dirty(global_shard_idx)
843
844    async def resize_store(self, new_shape: Tuple[int, ...]):
845        """
846        Resizes the store's main shard index to accommodate a new overall array shape.
847        This is a metadata-only operation on the store's root object.
848        Used when doing skeleton writes or appends via xarray where the array shape changes.
849        """
850        if self.read_only:
851            raise PermissionError("Cannot resize a read-only store.")
852        if (
853            # self._root_obj is None
854            self._chunk_shape is None
855            or self._chunks_per_shard is None
856            or self._array_shape is None
857        ):
858            raise RuntimeError("Store is not properly initialized for resizing.")
859        if len(new_shape) != len(self._array_shape):
860            raise ValueError(
861                "New shape must have the same number of dimensions as the old shape."
862            )
863
864        self._array_shape = tuple(new_shape)
865        self._chunks_per_dim = tuple(
866            math.ceil(a / c) if c > 0 else 0
867            for a, c in zip(self._array_shape, self._chunk_shape)
868        )
869        self._total_chunks = math.prod(self._chunks_per_dim)
870        old_num_shards = self._num_shards if self._num_shards is not None else 0
871        self._num_shards = (
872            (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard
873            if self._total_chunks > 0
874            else 0
875        )
876        self._root_obj["chunks"]["array_shape"] = list(self._array_shape)
877        if self._num_shards > old_num_shards:
878            self._root_obj["chunks"]["shard_cids"].extend(
879                [None] * (self._num_shards - old_num_shards)
880            )
881        elif self._num_shards < old_num_shards:
882            self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][
883                "shard_cids"
884            ][: self._num_shards]
885
886        self._dirty_root = True
887
888    async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
889        """
890        Resizes the Zarr metadata for a specific variable (e.g., '.json' file).
891        This does NOT change the store's main shard index.
892        """
893        if self.read_only:
894            raise PermissionError("Cannot resize a read-only store.")
895
896        zarr_metadata_key = f"{variable_name}/zarr.json"
897
898        old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key)
899        if not old_zarr_metadata_cid:
900            raise KeyError(
901                f"Cannot find metadata for key '{zarr_metadata_key}' to resize."
902            )
903
904        old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid)
905        zarr_metadata_json = json.loads(old_zarr_metadata_bytes)
906
907        zarr_metadata_json["shape"] = list(new_shape)
908
909        new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode(
910            "utf-8"
911        )
912        # Metadata is a raw blob of bytes
913        new_zarr_metadata_cid = await self.cas.save(
914            new_zarr_metadata_bytes, codec="raw"
915        )
916
917        self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid
918        self._metadata_read_cache[zarr_metadata_key] = new_zarr_metadata_bytes
919        self._dirty_root = True
920
921    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
922        seen: Set[str] = set()
923        if prefix == "":
924            async for key in self.list():  # Iterates metadata keys
925                # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1"
926                # if key is ".zgroup", first_component is ".zgroup"
927                first_component = key.split("/", 1)[0]
928                if first_component not in seen:
929                    seen.add(first_component)
930                    yield first_component
931        else:
932            raise NotImplementedError("Listing with a prefix is not implemented yet.")

Implements the Zarr Store API using a sharded layout for chunk CIDs.

This store divides the flat index of chunk CIDs into multiple "shards". Each shard is a DAG-CBOR array where each element is either a CID link to a chunk or a null value if the chunk is empty. This structure allows for efficient traversal by IPLD-aware systems.

The store's root object contains:

  1. A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
  2. A list of CIDs, where each CID points to a shard object.
  3. Sharding configuration details (e.g., chunks_per_shard).
ShardedZarrStore( cas: ContentAddressedStore, read_only: bool, root_cid: Optional[str] = None, *, max_cache_memory_bytes: int = 104857600)
160    def __init__(
161        self,
162        cas: ContentAddressedStore,
163        read_only: bool,
164        root_cid: Optional[str] = None,
165        *,
166        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
167    ):
168        """Use the async `open()` classmethod to instantiate this class."""
169        super().__init__(read_only=read_only)
170        self.cas = cas
171        self._root_cid = root_cid
172        self._root_obj: dict
173
174        self._resize_lock = asyncio.Lock()
175        # An event to signal when a resize is in-progress.
176        # It starts in the "set" state, allowing all operations to proceed.
177        self._resize_complete = asyncio.Event()
178        self._resize_complete.set()
179        self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
180
181        self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes)
182        self._pending_shard_loads: Dict[int, asyncio.Event] = {}
183        self._metadata_read_cache: Dict[str, bytes] = {}
184
185        self._array_shape: Tuple[int, ...]
186        self._chunk_shape: Tuple[int, ...]
187        self._chunks_per_dim: Tuple[int, ...]
188        self._chunks_per_shard: int
189        self._num_shards: int = 0
190        self._total_chunks: int = 0
191
192        self._dirty_root = False

Use the async open() classmethod to instantiate this class.

cas
@classmethod
async def open( cls, cas: ContentAddressedStore, read_only: bool, root_cid: Optional[str] = None, *, array_shape: Optional[Tuple[int, ...]] = None, chunk_shape: Optional[Tuple[int, ...]] = None, chunks_per_shard: Optional[int] = None, max_cache_memory_bytes: int = 104857600) -> ShardedZarrStore:
213    @classmethod
214    async def open(
215        cls,
216        cas: ContentAddressedStore,
217        read_only: bool,
218        root_cid: Optional[str] = None,
219        *,
220        array_shape: Optional[Tuple[int, ...]] = None,
221        chunk_shape: Optional[Tuple[int, ...]] = None,
222        chunks_per_shard: Optional[int] = None,
223        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
224    ) -> "ShardedZarrStore":
225        """
226        Asynchronously opens an existing ShardedZarrStore or initializes a new one.
227        """
228        store = cls(
229            cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes
230        )
231        if root_cid:
232            await store._load_root_from_cid()
233        elif not read_only:
234            if array_shape is None or chunk_shape is None:
235                raise ValueError(
236                    "array_shape and chunk_shape must be provided for a new store."
237                )
238
239            if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0:
240                raise ValueError("chunks_per_shard must be a positive integer.")
241
242            store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard)
243        else:
244            raise ValueError("root_cid must be provided for a read-only store.")
245        return store

Asynchronously opens an existing ShardedZarrStore or initializes a new one.

async def set_partial_values( self, key_start_values: Iterable[typing.Tuple[str, int, bytes | bytearray | memoryview]]) -> None:
476    async def set_partial_values(
477        self, key_start_values: Iterable[Tuple[str, int, BytesLike]]
478    ) -> None:
479        raise NotImplementedError(
480            "Partial writes are not supported by ShardedZarrStore."
481        )

Store values at a given key, starting at byte range_start.

Parameters

key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

async def get_partial_values( self, prototype: zarr.core.buffer.core.BufferPrototype, key_ranges: Iterable[typing.Tuple[str, zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None]]) -> List[Optional[zarr.core.buffer.core.Buffer]]:
483    async def get_partial_values(
484        self,
485        prototype: zarr.core.buffer.BufferPrototype,
486        key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]],
487    ) -> List[Optional[zarr.core.buffer.Buffer]]:
488        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
489        results = await asyncio.gather(*tasks)
490        return results

Retrieve possibly partial values from given key_ranges.

Parameters

prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns

list of values, in the order of the key_ranges, may contain null/none for missing keys

def with_read_only( self, read_only: bool = False) -> ShardedZarrStore:
492    def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore":
493        """
494        Return this store (if the flag already matches) or a *shallow*
495        clone that presents the requested read‑only status.
496
497        The clone **shares** the same CAS instance and internal state;
498        no flushing, network traffic or async work is done.
499        """
500        # Fast path
501        if read_only == self.read_only:
502            return self  # Same mode, return same instance
503
504        # Create new instance with different read_only flag
505        # Creates a *bare* instance without running its __init__
506        clone = type(self).__new__(type(self))
507
508        # Copy all attributes from the current instance
509        clone.cas = self.cas
510        clone._root_cid = self._root_cid
511        clone._root_obj = self._root_obj
512
513        clone._resize_lock = self._resize_lock
514        clone._resize_complete = self._resize_complete
515        clone._shard_locks = self._shard_locks
516
517        clone._shard_data_cache = self._shard_data_cache
518        clone._pending_shard_loads = self._pending_shard_loads
519        clone._metadata_read_cache = self._metadata_read_cache
520
521        clone._array_shape = self._array_shape
522        clone._chunk_shape = self._chunk_shape
523        clone._chunks_per_dim = self._chunks_per_dim
524        clone._chunks_per_shard = self._chunks_per_shard
525        clone._num_shards = self._num_shards
526        clone._total_chunks = self._total_chunks
527
528        clone._dirty_root = self._dirty_root
529
530        # Re‑initialise the zarr base class so that Zarr sees the flag
531        zarr.abc.store.Store.__init__(clone, read_only=read_only)
532        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same CAS instance and internal state; no flushing, network traffic or async work is done.

async def flush(self) -> str:
541    async def flush(self) -> str:
542        async with self._shard_data_cache._cache_lock:
543            dirty_shards = list(self._shard_data_cache._dirty_shards)
544        if dirty_shards:
545            for shard_idx in sorted(dirty_shards):
546                # Get the list of CIDs/Nones from the cache
547                shard_data_list = await self._shard_data_cache.get(shard_idx)
548                if shard_data_list is None:
549                    raise RuntimeError(f"Dirty shard {shard_idx} not found in cache")
550
551                # Encode this list into a DAG-CBOR byte representation
552                shard_data_bytes = dag_cbor.encode(shard_data_list)
553
554                # Save the DAG-CBOR block and get its CID
555                new_shard_cid_obj = await self.cas.save(
556                    shard_data_bytes,
557                    codec="dag-cbor",  # Use 'dag-cbor' codec
558                )
559
560                if (
561                    self._root_obj["chunks"]["shard_cids"][shard_idx]
562                    != new_shard_cid_obj
563                ):
564                    # Store the CID object directly
565                    self._root_obj["chunks"]["shard_cids"][shard_idx] = (
566                        new_shard_cid_obj
567                    )
568                    self._dirty_root = True
569                    # Mark shard as clean after flushing
570                    await self._shard_data_cache.mark_clean(shard_idx)
571
572        if self._dirty_root:
573            # Ensure all metadata CIDs are CID objects for correct encoding
574            self._root_obj["metadata"] = {
575                k: (CID.decode(v) if isinstance(v, str) else v)
576                for k, v in self._root_obj["metadata"].items()
577            }
578            root_obj_bytes = dag_cbor.encode(self._root_obj)
579            new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor")
580            self._root_cid = str(new_root_cid)
581            self._dirty_root = False
582
583        # Ignore because root_cid will always exist after initialization or flush.
584        return self._root_cid  # type: ignore[return-value]
async def get( self, key: str, prototype: zarr.core.buffer.core.BufferPrototype, byte_range: Union[zarr.abc.store.RangeByteRequest, zarr.abc.store.OffsetByteRequest, zarr.abc.store.SuffixByteRequest, NoneType] = None) -> Optional[zarr.core.buffer.core.Buffer]:
586    async def get(
587        self,
588        key: str,
589        prototype: zarr.core.buffer.BufferPrototype,
590        byte_range: Optional[zarr.abc.store.ByteRequest] = None,
591    ) -> Optional[zarr.core.buffer.Buffer]:
592        with instrumentation.span(
593            "py_hamt.sharded_store.get",
594            {
595                "py_hamt.zarr.key": key,
596                "py_hamt.zarr.byte_range": byte_range is not None,
597            },
598        ):
599            started_at = time.perf_counter()
600            hit = False
601            kind = "metadata"
602            shard_idx_for_trace: int | None = None
603            chunk_coords = self._parse_chunk_key(key)
604            try:
605                # Metadata request
606                if chunk_coords is None:
607                    metadata_cid_obj = self._root_obj["metadata"].get(key)
608                    if metadata_cid_obj is None:
609                        return None
610                    if byte_range is not None:
611                        raise ValueError(
612                            "Byte range requests are not supported for metadata keys."
613                        )
614                    data = self._metadata_read_cache.get(key)
615                    if data is None:
616                        data = await self.cas.load(str(metadata_cid_obj))
617                        self._metadata_read_cache[key] = data
618                    hit = True
619                    return prototype.buffer.from_bytes(data)
620                # Chunk data request
621                kind = "chunk"
622                linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
623                shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
624                shard_idx_for_trace = shard_idx
625
626                # This will load the full shard into cache if it's not already there.
627                shard_lock = self._shard_locks[shard_idx]
628                async with shard_lock:
629                    target_shard_list = await self._load_or_initialize_shard_cache(
630                        shard_idx
631                    )
632
633                # Get the CID object (or None) from the cached list.
634                chunk_cid_obj = target_shard_list[index_in_shard]
635
636                if chunk_cid_obj is None:
637                    return None  # Chunk is empty/doesn't exist.
638
639                chunk_cid_str = str(chunk_cid_obj)
640
641                req_offset = None
642                req_length = None
643                req_suffix = None
644
645                if byte_range:
646                    if isinstance(byte_range, RangeByteRequest):
647                        req_offset = byte_range.start
648                        if byte_range.end is not None:
649                            if byte_range.start > byte_range.end:
650                                raise ValueError(
651                                    f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})"
652                                )
653                            req_length = byte_range.end - byte_range.start
654                    elif isinstance(byte_range, OffsetByteRequest):
655                        req_offset = byte_range.offset
656                    elif isinstance(byte_range, SuffixByteRequest):
657                        req_suffix = byte_range.suffix
658                data = await self.cas.load(
659                    chunk_cid_str,
660                    offset=req_offset,
661                    length=req_length,
662                    suffix=req_suffix,
663                )
664                hit = True
665                return prototype.buffer.from_bytes(data)
666            finally:
667                instrumentation.record_zarr_get(
668                    store="sharded_store",
669                    key=key,
670                    kind=kind,
671                    hit=hit,
672                    seconds=time.perf_counter() - started_at,
673                    byte_range=byte_range is not None,
674                    shard_idx=shard_idx_for_trace,
675                )

Retrieve the value associated with a given key.

Parameters

key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns

Buffer

async def set(self, key: str, value: zarr.core.buffer.core.Buffer) -> None:
677    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
678        if self.read_only:
679            raise PermissionError("Cannot write to a read-only store.")
680        await self._resize_complete.wait()
681
682        if key.endswith("zarr.json") and not key == "zarr.json":
683            metadata_json = json.loads(value.to_bytes().decode("utf-8"))
684            new_array_shape = metadata_json.get("shape")
685            # Some metadata entries (e.g., group metadata) do not have a shape field.
686            if new_array_shape:
687                # Only resize when the metadata shape represents the primary array.
688                if (
689                    len(new_array_shape) == len(self._array_shape)
690                    and tuple(new_array_shape) != self._array_shape
691                ):
692                    async with self._resize_lock:
693                        # Double-check after acquiring the lock, in case another task
694                        # just finished this exact resize while we were waiting.
695                        if (
696                            len(new_array_shape) == len(self._array_shape)
697                            and tuple(new_array_shape) != self._array_shape
698                        ):
699                            # Block all other tasks until resize is complete.
700                            self._resize_complete.clear()
701                            try:
702                                await self.resize_store(
703                                    new_shape=tuple(new_array_shape)
704                                )
705                            finally:
706                                # All waiting tasks will now un-pause and proceed safely.
707                                self._resize_complete.set()
708
709        raw_data_bytes = value.to_bytes()
710        # Save the data to CAS first to get its CID.
711        # Metadata is often saved as 'raw', chunks as well unless compressed.
712        try:
713            data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw")
714            await self.set_pointer(key, str(data_cid_obj))
715            if self._parse_chunk_key(key) is None:
716                self._metadata_read_cache[key] = raw_data_bytes
717        except Exception as e:
718            raise RuntimeError(f"Failed to save data for key {key}: {e}")
719        return None  # type: ignore[return-value]

Store a (key, value) pair.

Parameters

key : str value : Buffer

async def set_pointer(self, key: str, pointer: str) -> None:
721    async def set_pointer(self, key: str, pointer: str) -> None:
722        chunk_coords = self._parse_chunk_key(key)
723
724        pointer_cid_obj = CID.decode(pointer)  # Convert string to CID object
725
726        if chunk_coords is None:  # Metadata key
727            self._root_obj["metadata"][key] = pointer_cid_obj
728            self._dirty_root = True
729            return None
730
731        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
732        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
733
734        shard_lock = self._shard_locks[shard_idx]
735        async with shard_lock:
736            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
737
738            if target_shard_list[index_in_shard] != pointer_cid_obj:
739                target_shard_list[index_in_shard] = pointer_cid_obj
740                await self._shard_data_cache.mark_dirty(shard_idx)
741        return None
async def exists(self, key: str) -> bool:
743    async def exists(self, key: str) -> bool:
744        try:
745            chunk_coords = self._parse_chunk_key(key)
746            if chunk_coords is None:  # Metadata
747                return key in self._root_obj.get("metadata", {})
748            linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
749            shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
750            # Load shard if not cached and check the index
751            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
752            return target_shard_list[index_in_shard] is not None
753        except (ValueError, IndexError, KeyError):
754            return False

Check if a key exists in the store.

Parameters

key : str

Returns

bool

supports_writes: bool
756    @property
757    def supports_writes(self) -> bool:
758        return not self.read_only

Does the store support writes?

supports_partial_writes: bool
760    @property
761    def supports_partial_writes(self) -> bool:
762        return False  # Each chunk CID is written atomically into a shard slot

Does the store support partial writes?

supports_deletes: bool
764    @property
765    def supports_deletes(self) -> bool:
766        return not self.read_only

Does the store support deletes?

async def delete(self, key: str) -> None:
768    async def delete(self, key: str) -> None:
769        if self.read_only:
770            raise PermissionError("Cannot delete from a read-only store.")
771
772        chunk_coords = self._parse_chunk_key(key)
773        if chunk_coords is None:  # Metadata
774            # Coordinate/metadata deletions should be idempotent for caller convenience.
775            if self._root_obj["metadata"].pop(key, None) is not None:
776                self._metadata_read_cache.pop(key, None)
777                self._dirty_root = True
778            return None
779
780        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
781        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
782
783        shard_lock = self._shard_locks[shard_idx]
784        async with shard_lock:
785            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
786            if target_shard_list[index_in_shard] is not None:
787                target_shard_list[index_in_shard] = None
788                await self._shard_data_cache.mark_dirty(shard_idx)

Remove a key from the store

Parameters

key : str

supports_listing: bool
790    @property
791    def supports_listing(self) -> bool:
792        return True

Does the store support listing?

async def list(self) -> AsyncIterator[str]:
794    async def list(self) -> AsyncIterator[str]:
795        for key in list(self._root_obj.get("metadata", {})):
796            yield key

Retrieve all keys in the store.

Returns

AsyncIterator[str]

async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
798    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
799        async for key in self.list():
800            if key.startswith(prefix):
801                yield key

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters

prefix : str

Returns

AsyncIterator[str]

async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
803    async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
804        if self.read_only:
805            raise PermissionError("Cannot graft onto a read-only store.")
806
807        store_to_graft = await ShardedZarrStore.open(
808            cas=self.cas, read_only=True, root_cid=store_to_graft_cid
809        )
810        source_chunk_grid = store_to_graft._chunks_per_dim
811        for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]):
812            linear_local_index = store_to_graft._get_linear_chunk_index(local_coords)
813            local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info(
814                linear_local_index
815            )
816            # Load the source shard into its cache
817            source_shard_list = await store_to_graft._load_or_initialize_shard_cache(
818                local_shard_idx
819            )
820
821            pointer_cid_obj = source_shard_list[index_in_local_shard]
822            if pointer_cid_obj is None:
823                continue
824
825            # Calculate global coordinates and write to the main store's index
826            global_coords = tuple(
827                c_local + c_offset
828                for c_local, c_offset in zip(local_coords, chunk_offset)
829            )
830            linear_global_index = self._get_linear_chunk_index(global_coords)
831            global_shard_idx, index_in_global_shard = self._get_shard_info(
832                linear_global_index
833            )
834
835            shard_lock = self._shard_locks[global_shard_idx]
836            async with shard_lock:
837                target_shard_list = await self._load_or_initialize_shard_cache(
838                    global_shard_idx
839                )
840                if target_shard_list[index_in_global_shard] != pointer_cid_obj:
841                    target_shard_list[index_in_global_shard] = pointer_cid_obj
842                    await self._shard_data_cache.mark_dirty(global_shard_idx)
async def resize_store(self, new_shape: Tuple[int, ...]):
844    async def resize_store(self, new_shape: Tuple[int, ...]):
845        """
846        Resizes the store's main shard index to accommodate a new overall array shape.
847        This is a metadata-only operation on the store's root object.
848        Used when doing skeleton writes or appends via xarray where the array shape changes.
849        """
850        if self.read_only:
851            raise PermissionError("Cannot resize a read-only store.")
852        if (
853            # self._root_obj is None
854            self._chunk_shape is None
855            or self._chunks_per_shard is None
856            or self._array_shape is None
857        ):
858            raise RuntimeError("Store is not properly initialized for resizing.")
859        if len(new_shape) != len(self._array_shape):
860            raise ValueError(
861                "New shape must have the same number of dimensions as the old shape."
862            )
863
864        self._array_shape = tuple(new_shape)
865        self._chunks_per_dim = tuple(
866            math.ceil(a / c) if c > 0 else 0
867            for a, c in zip(self._array_shape, self._chunk_shape)
868        )
869        self._total_chunks = math.prod(self._chunks_per_dim)
870        old_num_shards = self._num_shards if self._num_shards is not None else 0
871        self._num_shards = (
872            (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard
873            if self._total_chunks > 0
874            else 0
875        )
876        self._root_obj["chunks"]["array_shape"] = list(self._array_shape)
877        if self._num_shards > old_num_shards:
878            self._root_obj["chunks"]["shard_cids"].extend(
879                [None] * (self._num_shards - old_num_shards)
880            )
881        elif self._num_shards < old_num_shards:
882            self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][
883                "shard_cids"
884            ][: self._num_shards]
885
886        self._dirty_root = True

Resizes the store's main shard index to accommodate a new overall array shape. This is a metadata-only operation on the store's root object. Used when doing skeleton writes or appends via xarray where the array shape changes.

async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
888    async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
889        """
890        Resizes the Zarr metadata for a specific variable (e.g., '.json' file).
891        This does NOT change the store's main shard index.
892        """
893        if self.read_only:
894            raise PermissionError("Cannot resize a read-only store.")
895
896        zarr_metadata_key = f"{variable_name}/zarr.json"
897
898        old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key)
899        if not old_zarr_metadata_cid:
900            raise KeyError(
901                f"Cannot find metadata for key '{zarr_metadata_key}' to resize."
902            )
903
904        old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid)
905        zarr_metadata_json = json.loads(old_zarr_metadata_bytes)
906
907        zarr_metadata_json["shape"] = list(new_shape)
908
909        new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode(
910            "utf-8"
911        )
912        # Metadata is a raw blob of bytes
913        new_zarr_metadata_cid = await self.cas.save(
914            new_zarr_metadata_bytes, codec="raw"
915        )
916
917        self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid
918        self._metadata_read_cache[zarr_metadata_key] = new_zarr_metadata_bytes
919        self._dirty_root = True

Resizes the Zarr metadata for a specific variable (e.g., '.json' file). This does NOT change the store's main shard index.

async def list_dir(self, prefix: str) -> AsyncIterator[str]:
921    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
922        seen: Set[str] = set()
923        if prefix == "":
924            async for key in self.list():  # Iterates metadata keys
925                # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1"
926                # if key is ".zgroup", first_component is ".zgroup"
927                first_component = key.split("/", 1)[0]
928                if first_component not in seen:
929                    seen.add(first_component)
930                    yield first_component
931        else:
932            raise NotImplementedError("Listing with a prefix is not implemented yet.")

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters

prefix : str

Returns

AsyncIterator[str]

async def convert_hamt_to_sharded( cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int) -> str:
15async def convert_hamt_to_sharded(
16    cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int
17) -> str:
18    """
19    Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
20
21    Args:
22        cas: An initialized ContentAddressedStore instance (KuboCAS).
23        hamt_root_cid: The root CID of the source ZarrHAMTStore.
24        chunks_per_shard: The number of chunks to group into a single shard in the new store.
25
26    Returns:
27        The root CID of the newly created ShardedZarrStore.
28    """
29    print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---")
30    start_time = time.perf_counter()
31    # 1. Open the source HAMT store for reading
32    print("Opening source HAMT store...")
33    hamt_ro = await HAMT.build(
34        cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True
35    )
36    source_store = ZarrHAMTStore(hamt_ro, read_only=True)
37    source_dataset = xr.open_zarr(store=source_store, consolidated=True)
38    # 2. Introspect the source array to get its configuration
39    print("Reading metadata from source store...")
40
41    # Read the stores metadata to get array shape and chunk shape
42    data_var_name = next(iter(source_dataset.data_vars))
43    ordered_dims = list(source_dataset[data_var_name].dims)
44    array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims)
45    chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims)
46    array_shape = array_shape_tuple
47    chunk_shape = chunk_shape_tuple
48
49    # 3. Create the destination ShardedZarrStore for writing
50    print(
51        f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..."
52    )
53    dest_store = await ShardedZarrStore.open(
54        cas=cas,
55        read_only=False,
56        array_shape=array_shape,
57        chunk_shape=chunk_shape,
58        chunks_per_shard=chunks_per_shard,
59    )
60
61    print("Destination store initialized.")
62
63    # 4. Iterate and copy all data from source to destination
64    print("Starting data migration...")
65    count = 0
66    async for key in hamt_ro.keys():
67        count += 1
68        # Read the raw data (metadata or chunk) from the source
69        cid: CID = await hamt_ro.get_pointer(key)
70        cid_base32_str = str(cid.encode("base32"))
71
72        # Write the exact same key-value pair to the destination.
73        await dest_store.set_pointer(key, cid_base32_str)
74        if count % 200 == 0:  # pragma: no cover
75            print(f"Migrated {count} keys...")  # pragma: no cover
76
77    print(f"Migration of {count} total keys complete.")
78
79    # 5. Finalize the new store by flushing it to the CAS
80    print("Flushing new store to get final root CID...")
81    new_root_cid = await dest_store.flush()
82    end_time = time.perf_counter()
83
84    print("\n--- Conversion Complete! ---")
85    print(f"Total time: {end_time - start_time:.2f} seconds")
86    print(f"New ShardedZarrStore Root CID: {new_root_cid}")
87    return new_root_cid

Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.

Args: cas: An initialized ContentAddressedStore instance (KuboCAS). hamt_root_cid: The root CID of the source ZarrHAMTStore. chunks_per_shard: The number of chunks to group into a single shard in the new store.

Returns: The root CID of the newly created ShardedZarrStore.

async def sharded_converter_cli():
 90async def sharded_converter_cli():
 91    parser = argparse.ArgumentParser(
 92        description="Convert a Zarr HAMT store to a Sharded Zarr store."
 93    )
 94    parser.add_argument(
 95        "hamt_cid", type=str, help="The root CID of the source Zarr HAMT store."
 96    )
 97    parser.add_argument(
 98        "--chunks-per-shard",
 99        type=int,
100        default=6250,
101        help="Number of chunk CIDs to store per shard in the new store.",
102    )
103    parser.add_argument(
104        "--rpc-url",
105        type=str,
106        default="http://127.0.0.1:5001",
107        help="The URL of the IPFS Kubo RPC API.",
108    )
109    parser.add_argument(
110        "--gateway-url",
111        type=str,
112        default="http://127.0.0.1:8080",
113        help="The URL of the IPFS Gateway.",
114    )
115    args = parser.parse_args()
116    # Initialize the KuboCAS client with the provided RPC and Gateway URLs
117    async with KuboCAS(
118        rpc_base_url=args.rpc_url, gateway_base_url=args.gateway_url
119    ) as cas_client:
120        try:
121            await convert_hamt_to_sharded(
122                cas=cas_client,
123                hamt_root_cid=args.hamt_cid,
124                chunks_per_shard=args.chunks_per_shard,
125            )
126        except Exception as e:
127            print(f"\nAn error occurred: {e}")